ATLAS Offline Software
Loading...
Searching...
No Matches
Worker.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7
8//
9// includes
10//
11
12#include <EventLoop/Worker.h>
13
16#include <EventLoop/BatchJob.h>
19#include <EventLoop/Driver.h>
21#include <EventLoop/Job.h>
23#include <EventLoop/Module.h>
39#include <TFile.h>
40#include <TH1.h>
41#include <TROOT.h>
42#include <TSystem.h>
43#include <TTree.h>
44#include <TObjString.h>
45#include <fstream>
46#include <memory>
47#include <exception>
48
49//
50// method implementations
51//
52
53namespace EL
54{
55 namespace
56 {
57 StatusCode make_module (std::unique_ptr<Detail::Module>& module, asg::AsgComponentConfig config)
58 {
59 using namespace msgEventLoop;
60 ANA_MSG_DEBUG ("making EventLoop module of type " + config.type());
61 ANA_CHECK (config.makeComponentExpert (module, "new %1% (\"%2%\")", false, "ELModule."));
62 ANA_MSG_DEBUG ("Created EventLoop module of type " << config.type());
63 return StatusCode::SUCCESS;
64 }
65 }
66
67
68
69 void Worker ::
70 testInvariant () const
71 {
72 RCU_INVARIANT (this != nullptr);
73 for (std::size_t iter = 0, end = m_algs.size(); iter != end; ++ iter)
74 {
75 RCU_INVARIANT (m_algs[iter].m_algorithm != nullptr);
76 }
77 }
78
79
80
81 Worker ::
82 ~Worker ()
83 {
85 }
86
87
88
89 void Worker ::
90 addOutput (TObject *output_swallow)
91 {
92 std::unique_ptr<TObject> output (output_swallow);
93
95 RCU_REQUIRE_SOFT (output_swallow != 0);
96
97 RCU::SetDirectory (output_swallow, 0);
98 ModuleData::addOutput (std::move (output));
99 }
100
101
102
103 void Worker ::
104 addOutputList (const std::string& name, TObject *output_swallow)
105 {
106 std::unique_ptr<TObject> output (output_swallow);
107
109 RCU_REQUIRE_SOFT (output_swallow != 0);
110
111 RCU::SetDirectory (output_swallow, 0);
112 std::unique_ptr<TList> list (new TList);
113 list->SetName (name.c_str());
114 list->Add (output.release());
115 addOutput (list.release());
116 }
117
118
119
120 TObject *Worker ::
121 getOutputHist (const std::string& name) const
122 {
123 RCU_READ_INVARIANT (this);
124
125 TObject *result = m_histOutput->getOutputHist (name);
126 if (result == nullptr) RCU_THROW_MSG ("unknown output histogram: " + name);
127 return result;
128 }
129
130
131
132 TFile *Worker ::
133 getOutputFile (const std::string& label) const
134 {
135 RCU_READ_INVARIANT (this);
136 TFile *result = getOutputFileNull (label);
137 if (result == 0)
138 RCU_THROW_MSG ("no output dataset defined with label: " + label);
139 return result;
140 }
141
142
143
144 TFile *Worker ::
145 getOutputFileNull (const std::string& label) const
146 {
147 RCU_READ_INVARIANT (this);
148 auto iter = m_outputs.find (label);
149 if (iter == m_outputs.end())
150 return 0;
151 return iter->second->file();
152 }
153
154
155
157 addTree( const TTree& tree, const std::string& stream )
158 {
159 using namespace msgEventLoop;
160 RCU_READ_INVARIANT( this );
161
162 auto outputIter = m_outputs.find (stream);
163 if (outputIter == m_outputs.end())
164 {
165 ANA_MSG_ERROR ( "No output file with stream name \"" + stream +
166 "\" found" );
167 return ::StatusCode::FAILURE;
168 }
169
170 outputIter->second->addClone (tree);
171
172 // Return gracefully:
173 return ::StatusCode::SUCCESS;
174 }
175
176
177
178 TTree *Worker::
179 getOutputTree( const std::string& name, const std::string& stream ) const
180 {
181 using namespace msgEventLoop;
182 RCU_READ_INVARIANT( this );
183
184 auto outputIter = m_outputs.find (stream);
185 if (outputIter == m_outputs.end())
186 {
187 RCU_THROW_MSG ( "No output file with stream name \"" + stream
188 + "\" found" );
189 }
190
191 TTree *result = outputIter->second->getOutputTree( name );
192 if( result == nullptr ) {
193 RCU_THROW_MSG ( "No tree with name \"" + name + "\" in stream \"" +
194 stream + "\"" );
195 }
196 return result;
197 }
198
199
200
201 const SH::MetaObject *Worker ::
202 metaData () const
203 {
204 RCU_READ_INVARIANT (this);
205 return m_metaData;
206 }
207
208
209
210 TTree *Worker ::
211 tree () const
212 {
213 RCU_READ_INVARIANT (this);
214 return m_inputTree;
215 }
216
217
218
219 Long64_t Worker ::
220 treeEntry () const
221 {
222 RCU_READ_INVARIANT (this);
223 return m_inputEntry;
224 }
225
226
227
228 TFile *Worker ::
229 inputFile () const
230 {
231 RCU_READ_INVARIANT (this);
232 return m_inputFile.get();
233 }
234
235
236 bool Worker ::
237 hasInputEvents () const
238 {
239 // no invariant used
240 return m_hasInputEvents;
241 }
242
243
244 std::string Worker ::
245 inputFileName () const
246 {
247 // no invariant used
248 std::string path = inputFile()->GetName();
249 auto split = path.rfind ('/');
250 if (split != std::string::npos)
251 return path.substr (split + 1);
252 else
253 return path;
254 }
255
256
257
258 TTree *Worker ::
259 triggerConfig () const
260 {
261 RCU_READ_INVARIANT (this);
262 return dynamic_cast<TTree*>(inputFile()->Get("physicsMeta/TrigConfTree"));
263 }
264
265
266
267 xAOD::Event *Worker ::
268 xaodEvent () const
269 {
270 RCU_READ_INVARIANT (this);
271
272 if (m_event == nullptr)
273 RCU_THROW_MSG ("Job not configured for xAOD support");
274 return m_event;
275 }
276
277
278
279 xAOD::TStore *Worker ::
280 xaodStore () const
281 {
282 RCU_READ_INVARIANT (this);
283
284 if (m_tstore == nullptr)
285 RCU_THROW_MSG ("Job not configured for xAOD support");
286 return m_tstore;
287 }
288
289
290
291 Algorithm *Worker ::
292 getAlg (const std::string& name) const
293 {
294 RCU_READ_INVARIANT (this);
295 for (auto& alg : m_algs)
296 {
297 if (alg->hasName (name))
298 return alg.m_algorithm->getLegacyAlg();
299 }
300 return 0;
301 }
302
303
304
305 void Worker ::
306 skipEvent ()
307 {
309 m_skipEvent = true;
310 }
311
312
313
314 bool Worker ::
315 filterPassed () const noexcept
316 {
317 RCU_READ_INVARIANT (this);
318 return !m_skipEvent;
319 }
320
321
322
323 void Worker ::
324 setFilterPassed (bool val_filterPassed) noexcept
325 {
327 m_skipEvent = !val_filterPassed;
328 }
329
330
331
332 Worker ::
333 Worker ()
334 {
335 m_worker = this;
336
337 RCU_NEW_INVARIANT (this);
338 }
339
340
341
342 void Worker ::
343 setMetaData (const SH::MetaObject *val_metaData)
344 {
346 RCU_REQUIRE (val_metaData != 0);
347
348 m_metaData = val_metaData;
349 }
350
351
352
353 void Worker ::
354 setOutputHist (const std::string& val_outputTarget)
355 {
357
358 m_outputTarget = val_outputTarget;
359 }
360
361
362
363 void Worker ::
364 setSegmentName (const std::string& val_segmentName)
365 {
367
368 m_segmentName = val_segmentName;
369 }
370
371
372
373 void Worker ::
374 setJobConfig (JobConfig&& jobConfig)
375 {
377 for (auto& alg : jobConfig.extractAlgorithms())
378 {
379 m_algs.push_back (std::move (alg));
380 }
381 }
382
383
384
385 ::StatusCode Worker ::
386 initialize ()
387 {
388 using namespace msgEventLoop;
390
391 const bool xAODInput = m_metaData->castBool (Job::optXAODInput, false);
392
393 ANA_MSG_INFO ("xAODInput = " << xAODInput);
394
395 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
396 m_moduleConfig.emplace_back ("EL::Detail::MemoryMonitorModule/EarlyMemoryMonitorModule");
397 if (auto cacheSize = metaData()->castDouble (Job::optCacheSize, 0); cacheSize > 0)
398 {
399 m_moduleConfig.emplace_back ("EL::Detail::TreeCacheModule/TreeCacheModule");
400 ANA_CHECK (m_moduleConfig.back().setProperty ("cacheSize", Long64_t (cacheSize)));
401 ANA_CHECK (m_moduleConfig.back().setProperty ("cacheLearnEntries", Long64_t (metaData()->castInteger (Job::optCacheLearnEntries, 0))));
402 ANA_CHECK (m_moduleConfig.back().setProperty ("printPerFileStats", metaData()->castBool (Job::optPrintPerFileStats, false)));
403 }
404 if (xAODInput)
405 {
406 m_moduleConfig.emplace_back ("EL::Detail::EventModule/EventModule");
407 if (metaData()->castDouble (Job::optXAODSummaryReport, 1) == 0)
408 ANA_CHECK (m_moduleConfig.back().setProperty ("summaryReport", false));
409 ANA_CHECK (m_moduleConfig.back().setProperty ("useStats", metaData()->castBool (Job::optXAODPerfStats, false)));
410 }
411 auto factoryPreload = metaData()->castString (Job::optFactoryPreload, "");
412 if (!factoryPreload.empty())
413 {
414 m_moduleConfig.emplace_back ("EL::Detail::FactoryPreloadModule/FactoryPreloadModule");
415 ANA_CHECK (m_moduleConfig.back().setProperty ("preloader", factoryPreload));
416 }
417 m_moduleConfig.emplace_back ("EL::Detail::LeakCheckModule/LeakCheckModule");
418 ANA_CHECK (m_moduleConfig.back().setProperty ("failOnLeak", metaData()->castBool (Job::optMemFailOnLeak, false)));
419 ANA_CHECK (m_moduleConfig.back().setProperty ("absResidentLimit", metaData()->castInteger (Job::optMemResidentIncreaseLimit, 10000)));
420 ANA_CHECK (m_moduleConfig.back().setProperty ("absVirtualLimit", metaData()->castInteger (Job::optMemVirtualIncreaseLimit, 0)));
421 ANA_CHECK (m_moduleConfig.back().setProperty ("perEvResidentLimit", metaData()->castInteger (Job::optMemResidentPerEventIncreaseLimit, 10)));
422 ANA_CHECK (m_moduleConfig.back().setProperty ("perEvVirtualLimit", metaData()->castInteger (Job::optMemVirtualPerEventIncreaseLimit, 0)));
423 m_moduleConfig.emplace_back ("EL::Detail::StopwatchModule/StopwatchModule");
424 if (metaData()->castBool (Job::optGridReporting, false))
425 m_moduleConfig.emplace_back ("EL::Detail::GridReportingModule/GridReportingModule");
426 if (metaData()->castBool (Job::optAlgorithmTimer, false))
427 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmTimerModule/AlgorithmTimerModule");
428 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
429 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmMemoryModule/AlgorithmMemoryModule");
430 m_moduleConfig.emplace_back ("EL::Detail::FileExecutedModule/FileExecutedModule");
431 m_moduleConfig.emplace_back ("EL::Detail::EventCountModule/EventCountModule");
432 m_moduleConfig.emplace_back ("EL::Detail::WorkerConfigModule/WorkerConfigModule");
433 m_moduleConfig.emplace_back ("EL::Detail::AlgorithmStateModule/AlgorithmStateModule");
434 m_moduleConfig.emplace_back ("EL::Detail::PostClosedOutputsModule/PostClosedOutputsModule");
435 if (metaData()->castBool (Job::optAlgorithmMemoryMonitor, false))
436 m_moduleConfig.emplace_back ("EL::Detail::MemoryMonitorModule/LateMemoryMonitorModule");
437
438 for (const auto& config : m_moduleConfig)
439 {
440 std::unique_ptr<Detail::Module> module;
441 ANA_CHECK (make_module (module, config));
442 m_modules.push_back (std::move (module));
443 }
444
445 if (m_outputs.find (Job::histogramStreamName) == m_outputs.end())
446 {
448 m_outputTarget + "/hist-" + m_segmentName + ".root", "RECREATE"};
450 }
453 if (auto aliases = metaData()->castString (Job::optStreamAliases, ""); !aliases.empty())
454 {
455 // the format of aliases is "alias1=realname1,alias2=realname2"
456 std::istringstream iss (aliases);
457 std::string alias;
458 while (std::getline (iss, alias, ','))
459 {
460 auto pos = alias.find ('=');
461 if (pos == std::string::npos)
462 {
463 ANA_MSG_ERROR ("Invalid alias format: " << alias);
464 return ::StatusCode::FAILURE;
465 }
466 auto aliasName = alias.substr (0, pos);
467 auto realName = alias.substr (pos + 1);
468 auto realOutput = m_outputs.find (realName);
469 if (realOutput == m_outputs.end())
470 {
471 ANA_MSG_ERROR ("output stream " << realName << " not found for alias " << aliasName);
472 return ::StatusCode::FAILURE;
473 }
474 auto [aliasOutput, success] = m_outputs.emplace (aliasName, realOutput->second);
475 if (!success)
476 {
477 ANA_MSG_ERROR ("output stream " << aliasName << " already exists, can't make alias");
478 return ::StatusCode::FAILURE;
479 }
480 }
481 }
482
483 m_jobStats = std::make_unique<TTree>
484 ("EventLoop_JobStats", "EventLoop job statistics");
485 m_jobStats->SetDirectory (nullptr);
486
487 ANA_MSG_INFO ("calling firstInitialize on all modules");
488 for (auto& module : m_modules)
489 ANA_CHECK (module->firstInitialize (*this));
490 ANA_MSG_INFO ("calling preFileInitialize on all modules");
491 for (auto& module : m_modules)
492 ANA_CHECK (module->preFileInitialize (*this));
493
494 return ::StatusCode::SUCCESS;
495 }
496
497
498
499 ::StatusCode Worker ::
500 processInputs ()
501 {
502 using namespace msgEventLoop;
503
505
506 for (auto& module : m_modules)
507 ANA_CHECK (module->processInputs (*this, *this));
508
509 return ::StatusCode::SUCCESS;
510 }
511
512
513
514 ::StatusCode Worker ::
515 finalize ()
516 {
517 using namespace msgEventLoop;
518
520
521 if (m_algorithmsInitialized == false)
522 {
523 ANA_MSG_ERROR ("algorithms never got initialized");
524 return StatusCode::FAILURE;
525 }
526
528 for (auto& module : m_modules)
529 ANA_CHECK (module->onFinalize (*this));
530 for (auto& output : m_outputs)
531 {
532 if (output.first != Job::histogramStreamName && output.second->mainStreamName() == output.first)
533 {
534 output.second->saveOutput ();
535 output.second->close ();
536 std::string path = output.second->finalFileName ();
537 if (!path.empty())
538 addOutputList ("EventLoop_OutputStream_" + output.first, new TObjString (path.c_str()));
539 }
540 }
541 for (auto& module : m_modules)
542 ANA_CHECK (module->postFinalize (*this));
543 if (m_jobStats->GetListOfBranches()->GetEntries() > 0)
544 {
545 if (m_jobStats->Fill() <= 0)
546 {
547 ANA_MSG_ERROR ("failed to fill the job statistics tree");
548 return ::StatusCode::FAILURE;
549 }
550 ModuleData::addOutput (std::move (m_jobStats));
551 }
552 m_histOutput->saveOutput ();
553 for (auto& module : m_modules)
554 ANA_CHECK (module->onWorkerEnd (*this));
555 m_histOutput->saveOutput ();
556 m_histOutput->close ();
557
558 for (auto& module : m_modules){
559 ANA_CHECK (module->postFileClose(*this));
560 }
561 ANA_MSG_INFO ("worker finished successfully");
562 return ::StatusCode::SUCCESS;
563 }
564
565
566
567 ::StatusCode Worker ::
568 processEvents (EventRange& eventRange)
569 {
570 using namespace msgEventLoop;
571
573 RCU_REQUIRE (!eventRange.m_url.empty());
574 RCU_REQUIRE (eventRange.m_beginEvent >= 0);
575 RCU_REQUIRE (eventRange.m_endEvent == EventRange::eof || eventRange.m_endEvent >= eventRange.m_beginEvent);
576
577 ANA_CHECK (openInputFile (eventRange.m_url));
578
579 if (eventRange.m_beginEvent > inputFileNumEntries())
580 {
581 ANA_MSG_ERROR ("first event (" << eventRange.m_beginEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
582 return ::StatusCode::FAILURE;
583 }
584 if (eventRange.m_endEvent == EventRange::eof)
585 {
586 eventRange.m_endEvent = inputFileNumEntries();
587 } else if (eventRange.m_endEvent > inputFileNumEntries())
588 {
589 ANA_MSG_ERROR ("end event (" << eventRange.m_endEvent << ") points beyond last event in file (" << inputFileNumEntries() << ")");
590 return ::StatusCode::FAILURE;
591 }
592
593 m_inputEntry = eventRange.m_beginEvent;
594
595 if (m_algorithmsInitialized == false)
596 {
597 for (auto& module : m_modules)
598 ANA_CHECK (module->onInitialize (*this));
600 }
601
602 if (m_newInputFile)
603 {
604 m_newInputFile = false;
605 for (auto& module : m_modules)
606 ANA_CHECK (module->onNewInputFile (*this));
607 }
608
609 if (eventRange.m_beginEvent == 0)
610 {
611 for (auto& module : m_modules)
612 ANA_CHECK (module->onFileExecute (*this));
613 }
614
615 ANA_MSG_INFO ("Processing events " << eventRange.m_beginEvent << "-" << eventRange.m_endEvent << " in file " << eventRange.m_url);
616
617 for (uint64_t event = eventRange.m_beginEvent;
618 event != uint64_t (eventRange.m_endEvent);
619 ++ event)
620 {
621 m_inputEntry = event;
622 for (auto& module : m_modules)
623 {
624 if (module->onExecute (*this).isFailure())
625 {
626 ANA_MSG_ERROR ("processing event " << treeEntry() << " on file " << inputFileName());
627 return ::StatusCode::FAILURE;
628 }
629 }
630 if (m_firstEvent)
631 {
632 m_firstEvent = false;
633 for (auto& module : m_modules)
634 ANA_CHECK (module->postFirstEvent (*this));
635 }
637 if (m_eventsProcessed % 10000 == 0)
638 ANA_MSG_INFO ("Processed " << m_eventsProcessed << " events");
639 }
640 return ::StatusCode::SUCCESS;
641 }
642
643
644
645 bool Worker ::
646 fileOpenErrorFilter(int level, bool /*b1*/, const char* s1, const char * s2)
647 {
648 // Don't fail on missing dictionary messages.
649 if (strstr (s2, "no streamer or dictionary") != nullptr) {
650 return true;
651 }
652
653 // For messages above warning level (SysError, Error, Fatal)
654 if( level > kWarning ) {
655 // We won't output further; ROOT should have already put something in the log file
656 std::string msg = "ROOT error detected in Worker.cxx: ";
657 msg += s1;
658 msg += " ";
659 msg += s2;
660 throw std::runtime_error(msg);
661
662 // No need for further error handling
663 return false;
664 }
665
666 // Pass to the default error handlers
667 return true;
668 }
669
670 ::StatusCode Worker ::
671 openInputFile (const std::string& inputFileUrl)
672 {
673 using namespace msgEventLoop;
674
675 // Enable custom error handling in a nice way
677
679
680 if (m_inputFileUrl == inputFileUrl)
681 return ::StatusCode::SUCCESS;
682
683 if (!m_inputFileUrl.empty())
684 {
685 if (m_newInputFile == false)
686 {
687 for (auto& module : m_modules)
688 ANA_CHECK (module->onCloseInputFile (*this));
689 for (auto& module : m_modules)
690 ANA_CHECK (module->postCloseInputFile (*this));
691 }
692 m_newInputFile = false;
693 m_hasInputEvents = false;
694 m_inputTree = nullptr;
695 m_inputFile.reset ();
696 m_inputFileUrl.clear ();
697 }
698
699 if (inputFileUrl.empty())
700 return ::StatusCode::SUCCESS;
701
702 ANA_MSG_INFO ("Opening file " << inputFileUrl);
703 std::unique_ptr<TFile> inputFile;
704 try
705 {
706 inputFile = SH::openFile (inputFileUrl, *metaData());
707 } catch (...)
708 {
709 Detail::report_exception (std::current_exception());
710 }
711 if (inputFile.get() == 0)
712 {
713 ANA_MSG_ERROR ("failed to open file " << inputFileUrl);
714 for (auto& module : m_modules)
715 module->reportInputFailure (*this);
716 return ::StatusCode::FAILURE;
717 }
718 if (inputFile->IsZombie())
719 {
720 ANA_MSG_ERROR ("input file is a zombie: " << inputFileUrl);
721 for (auto& module : m_modules)
722 module->reportInputFailure (*this);
723 return ::StatusCode::FAILURE;
724 }
725
726 // Direct TTree access
727 TTree *tree = 0;
728 const std::string treeName
730 tree = dynamic_cast<TTree*>(inputFile->Get (treeName.c_str()));
731 if (tree == nullptr)
732 {
733 ANA_MSG_INFO ("tree " << treeName << " not found in input file: " << inputFileUrl);
734 ANA_MSG_INFO ("treating this like a tree with no events");
735 }
736 else {
737 m_hasInputEvents = (tree->GetEntries() > 0);
738 }
739
740 m_newInputFile = true;
742 m_inputEntry = 0;
743 m_inputFile = std::move (inputFile);
744 m_inputFileUrl = std::move (inputFileUrl);
745
746 // onFirstInputFile to setup Event object
748 {
749 for (auto& module : m_modules)
750 ANA_CHECK (module->onFirstInputFile (*this));
751 m_firstInputFile = false;
752 }
753 else {
754 for (auto& module : m_modules)
755 ANA_CHECK (module->onNextInputFile (*this));
756 }
757
758 // Check if we have input events - done above for TTree
759 if (m_inputTree == nullptr) {
760 if (m_event) m_hasInputEvents = (m_event->getEntries() > 0);
761 }
762
763 return ::StatusCode::SUCCESS;
764 }
765
766
767
768 ::StatusCode Worker ::
769 addOutputStream (const std::string& label,
771 {
772 using namespace msgEventLoop;
774
775 if (m_outputs.find (label) != m_outputs.end())
776 {
777 ANA_MSG_ERROR ("output file already defined for label: " + label);
778 return ::StatusCode::FAILURE;
779 }
780 if (data.file() == nullptr)
781 {
782 ANA_MSG_ERROR ("output stream does not have a file attached");
783 return ::StatusCode::FAILURE;
784 }
785 if (data.mainStreamName().empty())
786 data.setMainStreamName (label);
787 m_outputs.insert (std::make_pair (label, std::make_shared<Detail::OutputStreamData>(std::move (data))));
788 return ::StatusCode::SUCCESS;
789 }
790
791
792
793 Long64_t Worker ::
794 inputFileNumEntries () const
795 {
796 RCU_READ_INVARIANT (this);
797 RCU_REQUIRE (inputFile() != 0);
798
799 if (m_event) {
800 return m_event->getEntries();
801 }
802 else if (m_inputTree != 0)
803 return m_inputTree->GetEntries();
804 else
805 return 0;
806 }
807
808
809
810 uint64_t Worker ::
811 eventsProcessed () const noexcept
812 {
813 RCU_READ_INVARIANT (this);
814 return m_eventsProcessed;
815 }
816
817
818
819 ::StatusCode Worker ::
820 directExecute (const SH::SamplePtr& sample, const Job& job,
821 const std::string& location, const SH::MetaObject& options)
822 {
823 using namespace msgEventLoop;
825
826 SH::MetaObject meta (*sample->meta());
827 meta.fetchDefaults (options);
828
829 setMetaData (&meta);
830 setOutputHist (location);
831 setSegmentName (sample->name());
832
833 ANA_MSG_INFO ("Running sample: " << sample->name());
834
835 setJobConfig (JobConfig (job.jobConfig()));
836
837 for (Job::outputIter out = job.outputBegin(),
838 end = job.outputEnd(); out != end; ++ out)
839 {
841 out->output()->makeWriter (sample->name(), "", ".root")};
842 ANA_CHECK (addOutputStream (out->label(), std::move (data)));
843 }
844
845 {
846 m_moduleConfig.emplace_back ("EL::Detail::DirectInputModule/DirectInputModule");
847 ANA_CHECK (m_moduleConfig.back().setProperty ("fileList", sample->makeFileList()));
848 Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
849 if (maxEvents != -1)
850 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", maxEvents));
851 Long64_t skipEvents = metaData()->castDouble (Job::optSkipEvents, 0);
852 if (skipEvents != 0)
853 ANA_CHECK (m_moduleConfig.back().setProperty ("skipEvents", skipEvents));
854 }
855
858 ANA_CHECK (finalize ());
859 return ::StatusCode::SUCCESS;
860 }
861
862
863
864 ::StatusCode Worker ::
865 batchExecute (unsigned job_id, const char *confFile)
866 {
867 using namespace msgEventLoop;
869
870 try
871 {
872 std::unique_ptr<TFile> file (TFile::Open (confFile, "READ"));
873 if (file.get() == nullptr || file->IsZombie())
874 {
875 ANA_MSG_ERROR ("failed to open file: " << confFile);
876 return ::StatusCode::FAILURE;
877 }
878
879 std::unique_ptr<BatchJob> job (dynamic_cast<BatchJob*>(file->Get ("job")));
880 m_batchJob = job.get();
881 if (job.get() == nullptr)
882 {
883 ANA_MSG_ERROR ("failed to retrieve BatchJob object");
884 return ::StatusCode::FAILURE;
885 }
886
887 if (job_id >= job->segments.size())
888 {
889 ANA_MSG_ERROR ("invalid job-id " << job_id << ", max is " << job->segments.size());
890 return ::StatusCode::FAILURE;
891 }
892 BatchSegment *segment = &job->segments[job_id];
893 RCU_ASSERT (segment->job_id == job_id);
894 RCU_ASSERT (segment->sample < job->samples.size());
895 BatchSample *sample = &job->samples[segment->sample];
896
897 gSystem->Exec ("pwd");
898 gSystem->MakeDirectory ("output");
899
900 setMetaData (&sample->meta);
901 setOutputHist (job->location + "/fetch");
902 setSegmentName (segment->fullName);
903
904 setJobConfig (JobConfig (job->job.jobConfig()));
905
906 for (Job::outputIter out = job->job.outputBegin(),
907 end = job->job.outputEnd(); out != end; ++ out)
908 {
910 out->output()->makeWriter (segment->sampleName, segment->segmentName, ".root")};
911 ANA_CHECK (addOutputStream (out->label(), std::move (data)));
912 }
913
914 {
915 m_moduleConfig.emplace_back ("EL::Detail::BatchInputModule/BatchInputModule");
916 ANA_CHECK (m_moduleConfig.back().setProperty ("jobId", job_id));
917 Long64_t maxEvents = metaData()->castDouble (Job::optMaxEvents, -1);
918 if (maxEvents != -1)
919 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", maxEvents));
920 }
921
924 ANA_CHECK (finalize ());
925
926 std::ostringstream job_name;
927 job_name << job_id;
928 std::ofstream completed ((job->location + "/status/completed-" + job_name.str()).c_str());
929 return ::StatusCode::SUCCESS;
930 } catch (...)
931 {
932 Detail::report_exception (std::current_exception());
933 return ::StatusCode::FAILURE;
934 }
935 }
936
937
938
939 ::StatusCode Worker ::
940 gridExecute (const std::string& sampleName, Long64_t SkipEvents, Long64_t nEventsPerJob)
941 {
942 using namespace msgEventLoop;
944
945 ANA_MSG_INFO ("Running with ROOT version " << gROOT->GetVersion()
946 << " (" << gROOT->GetVersionDate() << ")");
947
948 ANA_MSG_INFO ("Loading EventLoop grid job");
949
950
951 TList bigOutputs;
952 std::unique_ptr<JobConfig> jobConfig;
953 SH::MetaObject *mo = 0;
954
955 std::unique_ptr<TFile> f (TFile::Open("jobdef.root"));
956 if (f == nullptr || f->IsZombie()) {
957 ANA_MSG_ERROR ("Could not read jobdef");
958 return ::StatusCode::FAILURE;
959 }
960
961 mo = dynamic_cast<SH::MetaObject*>(f->Get(sampleName.c_str()));
962 if (!mo)
963 mo = dynamic_cast<SH::MetaObject*>(f->Get("defaultMetaObject"));
964 if (!mo) {
965 ANA_MSG_ERROR ("Could not read in sample meta object");
966 return ::StatusCode::FAILURE;
967 }
968
969 jobConfig.reset (dynamic_cast<JobConfig*>(f->Get("jobConfig")));
970 if (jobConfig == nullptr)
971 {
972 ANA_MSG_ERROR ("failed to read jobConfig object");
973 return ::StatusCode::FAILURE;
974 }
975
976 {
977 std::unique_ptr<TList> outs ((TList*)f->Get("outputs"));
978 if (outs == nullptr)
979 {
980 ANA_MSG_ERROR ("Could not read list of outputs");
981 return ::StatusCode::FAILURE;
982 }
983
984 TIter itr(outs.get());
985 TObject *obj = 0;
986 while ((obj = itr())) {
987 EL::OutputStream * out = dynamic_cast<EL::OutputStream*>(obj);
988 if (out) {
989 bigOutputs.Add(out);
990 }
991 else {
992 ANA_MSG_ERROR ("Encountered unexpected entry in list of outputs");
993 return ::StatusCode::FAILURE;
994 }
995 }
996 }
997
998 f->Close();
999 f.reset ();
1000
1001 const std::string location = ".";
1002
1003 mo->setBool (Job::optGridReporting, true);
1004 setMetaData (mo);
1005 setOutputHist (location);
1006 setSegmentName ("output");
1007
1008 ANA_MSG_INFO ("Starting EventLoop Grid worker");
1009
1010 {//Create and register the "big" output files with base class
1011 TIter itr(&bigOutputs);
1012 TObject *obj = 0;
1013 while ((obj = itr())) {
1014 EL::OutputStream *os = dynamic_cast<EL::OutputStream*>(obj);
1015 if (os == nullptr)
1016 {
1017 ANA_MSG_ERROR ("Bad input");
1018 return ::StatusCode::FAILURE;
1019 }
1020 {
1022 location + "/" + os->label() + ".root", "RECREATE"};
1023 ANA_CHECK (addOutputStream (os->label(), std::move (data)));
1024 }
1025 }
1026 }
1027
1028 setJobConfig (std::move (*jobConfig));
1029
1030 {
1031 std::vector<std::string> fileList;
1032 std::ifstream infile("input.txt");
1033 while (infile) {
1034 std::string sLine;
1035 if (!getline(infile, sLine)) break;
1036 std::istringstream ssLine(sLine);
1037 while (ssLine) {
1038 std::string sFile;
1039 if (!getline(ssLine, sFile, ',')) break;
1040 fileList.push_back(sFile);
1041 }
1042 }
1043 if (fileList.size() == 0) {
1044 ANA_MSG_ERROR ("no input files provided");
1045 //User was expecting input after all.
1046 gSystem->Exit(EC_BADINPUT);
1047 }
1048 m_moduleConfig.emplace_back ("EL::Detail::DirectInputModule/DirectInputModule");
1049 ANA_CHECK (m_moduleConfig.back().setProperty ("fileList", fileList));
1050
1051 if (nEventsPerJob != -1)
1052 ANA_CHECK (m_moduleConfig.back().setProperty ("maxEvents", nEventsPerJob));
1053 if (SkipEvents != 0)
1054 ANA_CHECK (m_moduleConfig.back().setProperty ("skipEvents", SkipEvents));
1055 }
1056
1059 ANA_CHECK (finalize ());
1060
1061 int nEvents = eventsProcessed();
1062 ANA_MSG_INFO ("Loop finished.");
1063 ANA_MSG_INFO ("Read/processed " << nEvents << " events.");
1064
1065 ANA_MSG_INFO ("EventLoop Grid worker finished");
1066 ANA_MSG_INFO ("Saving output");
1067 return ::StatusCode::SUCCESS;
1068 }
1069}
#define RCU_INVARIANT(x)
Definition Assert.h:201
#define RCU_ASSERT(x)
Definition Assert.h:222
#define RCU_DESTROY_INVARIANT(x)
Definition Assert.h:235
#define RCU_CHANGE_INVARIANT(x)
Definition Assert.h:231
#define RCU_NEW_INVARIANT(x)
Definition Assert.h:233
#define RCU_REQUIRE(x)
Definition Assert.h:208
#define RCU_REQUIRE_SOFT(x)
Definition Assert.h:153
#define RCU_READ_INVARIANT(x)
Definition Assert.h:229
#define ANA_MSG_INFO(xmsg)
Macro printing info messages.
#define ANA_MSG_ERROR(xmsg)
Macro printing error messages.
#define ANA_MSG_DEBUG(xmsg)
Macro printing debug messages.
#define ANA_CHECK(EXP)
check whether the given expression was successful
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11
#define RCU_THROW_MSG(message)
Definition PrintMsg.h:58
Run a MT piece of code with an alternate root error handler.
all data needed to manage a given output stream
the job configuration that is independent of driver and dataset
Definition JobConfig.h:45
Definition Job.h:51
static const std::string optPrintPerFileStats
description: the option to turn on printing of i/o statistics at the end of each file rationale: whil...
Definition Job.h:413
static const std::string optMemFailOnLeak
Failure behaviour of the code when a "significant memory leak" is found.
Definition Job.h:595
static const std::string optMaxEvents
description: the name of the option used for setting the maximum number of events to process per samp...
Definition Job.h:230
static const std::string optGridReporting
whether to use grid reporting even when not running on the grid
Definition Job.h:498
static const std::string optAlgorithmTimer
a boolean flag for whether to add a timer for the algorithms
Definition Job.h:210
static const std::string optMemResidentIncreaseLimit
The minimal resident memory increase necessary to trigger an error.
Definition Job.h:573
static const std::string optXAODPerfStats
description: the name of the option for turning on XAODPerfStats.
Definition Job.h:361
const OutputStream * outputIter
Definition Job.h:148
static const std::string optXAODSummaryReport
the option to turn on/off the xAOD summary reporting at the end of the job
Definition Job.h:403
static const std::string optCacheLearnEntries
description: this option allows to configure the number of tree entries used for learning cache behav...
Definition Job.h:332
static const std::string optCacheSize
description: this option allows to configure the TTreeCache size for this job.
Definition Job.h:317
static const std::string optAlgorithmMemoryMonitor
a boolean flag for whether to add a memory monitor for the algorithms
Definition Job.h:215
static const std::string optXAODInput
the option to select whether our input is xAODs
Definition Job.h:398
static const std::string optMemResidentPerEventIncreaseLimit
The minimal per-event resident memory increase for triggering an error.
Definition Job.h:555
static const std::string optMemVirtualIncreaseLimit
The minimal virtual memory increase necessary to trigger an error.
Definition Job.h:581
static const std::string optMemVirtualPerEventIncreaseLimit
The minimal per-event virtual memory increase for triggering an error.
Definition Job.h:565
static const std::string optSkipEvents
description: the name of the option used for skipping a certain number of events in the beginning rat...
Definition Job.h:238
static const std::string optStreamAliases
an option for stream aliases
Definition Job.h:222
static const std::string optFactoryPreload
a boolean flag for whether to perform a component factory preload
Definition Job.h:219
static const std::string histogramStreamName
the name of the histogram output stream
Definition Job.h:606
Long64_t treeEntry() const override
description: the entry in the tree we are reading guarantee: no-fail
Definition Worker.cxx:220
std::string inputFileName() const override
the name of the file we are reading the current tree from, without the path component
Definition Worker.cxx:245
@ EC_BADINPUT
Definition Worker.h:260
void addOutputList(const std::string &name, TObject *output_swallow) override
effects: add a given object to the output.
Definition Worker.cxx:104
::StatusCode addTree(const TTree &tree, const std::string &stream) final override
effects: adds a tree to an output file specified by the stream/label failures: Incorrect stream/label...
Definition Worker.cxx:157
TFile * inputFile() const override
description: the file we are reading the current tree from guarantee: no-fail
Definition Worker.cxx:229
TTree * getOutputTree(const std::string &name, const std::string &stream) const final override
effects: get the tree that was added to an output file earlier failures: Tree doesn't exist
Definition Worker.cxx:179
static bool fileOpenErrorFilter(int level, bool, const char *, const char *)
Error handler for file opening.
Definition Worker.cxx:646
void setOutputHist(const std::string &val_outputTarget)
set the histogram output list
Definition Worker.cxx:354
std::vector< asg::AsgComponentConfig > m_moduleConfig
the module configurations we use
Definition Worker.h:457
::StatusCode addOutputStream(const std::string &label, Detail::OutputStreamData output)
effects: add another output file guarantee: strong failures: low level errors II failures: label alre...
Definition Worker.cxx:769
const SH::MetaObject * metaData() const override
description: the sample meta-data we are working on guarantee: no-fail invariant: metaData !...
Definition Worker.cxx:202
std::string m_segmentName
the name of the segment we are processing
Definition Worker.h:442
::StatusCode finalize()
finalize the worker
Definition Worker.cxx:515
bool m_firstInputFile
whether this is the first input file
Definition Worker.h:426
void setJobConfig(JobConfig &&jobConfig)
set the JobConfig
Definition Worker.cxx:374
::StatusCode openInputFile(const std::string &inputFileUrl) override
open the given input file without processing it
Definition Worker.cxx:671
uint64_t eventsProcessed() const noexcept
the number of events that have been processed
Definition Worker.cxx:811
bool m_newInputFile
whether this is a new input file (i.e.
Definition Worker.h:432
TTree * tree() const override
description: the tree we are running on guarantee: no-fail
Definition Worker.cxx:211
void setMetaData(const SH::MetaObject *val_metaData)
set the metaData
Definition Worker.cxx:343
bool m_algorithmsInitialized
whether the algorithms are initialized
Definition Worker.h:447
std::vector< std::unique_ptr< Detail::Module > > m_modules
the list of modules we hold
Definition Worker.h:421
void addOutput(TObject *output_swallow) final override
effects: add an object to the output.
Definition Worker.cxx:90
::StatusCode initialize()
initialize the worker
Definition Worker.cxx:386
TFile * getOutputFileNull(const std::string &label) const override
effects: get the output file that goes into the dataset with the given label.
Definition Worker.cxx:145
void setSegmentName(const std::string &val_segmentName)
set the segment name
Definition Worker.cxx:364
Long64_t inputFileNumEntries() const override
the number of events in the input file
Definition Worker.cxx:794
::StatusCode processInputs()
process all the inputs
Definition Worker.cxx:500
bool m_firstEvent
whether we are still to process the first event
Definition Worker.h:452
std::string m_outputTarget
the target file to which we will write the histogram output
Definition Worker.h:437
Run a MT piece of code with an alternate root error handler.
A class that manages meta-data to be associated with an object.
Definition MetaObject.h:56
void setBool(const std::string &name, bool value)
set the meta-data boolean with the given name
A smart pointer class that holds a single Sample object.
Definition SamplePtr.h:35
Base class for the event (xAOD::TEvent and xAOD::REvent) classes.
Definition Event.h:60
A relatively simple transient store for objects created in analysis.
Definition TStore.h:45
const int nEvents
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179
std::string label(const std::string &format, int i)
Definition label.h:19
void report_exception(std::exception_ptr eptr)
print out the currently evaluated exception
This module defines the arguments passed from the BATCH driver to the BATCH worker.
::StatusCode StatusCode
StatusCode definition for legacy code.
bool SetDirectory(TObject *object, TDirectory *directory)
effects: set the directory this object is associated with returns: whether the object type actively k...
Definition RootUtils.cxx:28
std::unique_ptr< TFile > openFile(const std::string &name, const MetaObject &options)
open a file with the given options
-diff
void initialize()
UInt_t job_id
description: the job id of this segment
std::string segmentName
the name/id to use for this segment (not including the sample name)
std::string fullName
the name/id to use for this segment (including the sample name)
UInt_t sample
description: the index of the sample we are using
std::string sampleName
the name of the sample for this segment
std::string m_inputFileUrl
the input file url of the currently opened file
Definition ModuleData.h:70
const SH::MetaObject * m_metaData
the meta-data we use
Definition ModuleData.h:89
Worker * m_worker
the worker (to pass on to the algorithms)
Definition ModuleData.h:110
TTree * m_inputTree
the (main) tree in the input file
Definition ModuleData.h:76
uint64_t m_inputEntry
the entry in the input tree we are currently looking at
Definition ModuleData.h:80
BatchJob * m_batchJob
the BatchJob configuration (if used)
Definition ModuleData.h:116
xAOD::TStore * m_tstore
the TStore structure, if we use one
Definition ModuleData.h:104
bool m_skipEvent
whether we are skipping the current event
Definition ModuleData.h:86
OutputStreamData * m_histOutput
the histogram output stream
Definition ModuleData.h:95
std::unique_ptr< TTree > m_jobStats
Tree saving per-job statistics information.
Definition ModuleData.h:98
bool m_hasInputEvents
flag whether the most recently opened input file has events or not
Definition ModuleData.h:83
std::unique_ptr< TFile > m_inputFile
the input file pointer of the currently opened filed
Definition ModuleData.h:73
std::map< std::string, std::shared_ptr< Detail::OutputStreamData > > m_outputs
the list of output files
Definition ModuleData.h:113
std::vector< Detail::AlgorithmData > m_algs
the list of algorithms
Definition ModuleData.h:67
uint64_t m_eventsProcessed
the number of events that have been processed
Definition ModuleData.h:92
xAOD::Event * m_event
the Event object, if we use one
Definition ModuleData.h:101
a range of events in a given file
Definition EventRange.h:22
std::string m_url
the location of the file
Definition EventRange.h:24
static constexpr Long64_t eof
the special value to indicate that the range includes all events until the end of the file
Definition EventRange.h:34
Long64_t m_beginEvent
the first event to process
Definition EventRange.h:27
Long64_t m_endEvent
the event past the last event, or eof
Definition EventRange.h:30
static const std::string treeName_default
the default value of treeName
Definition MetaFields.h:55
static const std::string treeName
the name of the tree in the sample
Definition MetaFields.h:52
MsgStream & msg
Definition testRead.cxx:32
TFile * file