ATLAS Offline Software
DecisionSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "DecisionSvc.h"
6 
7 // Gaudi include files
8 #include "GaudiKernel/EventContext.h"
10 #include <algorithm>
11 
12 DecisionSvc::DecisionSvc(const std::string& name,
13  ISvcLocator* pSvcLocator ) :
14  base_class(name, pSvcLocator),
15  m_evtStore("StoreGateSvc",name),
16 #ifdef SIMULATIONBASE
17  m_cutflowSvc("",name),
18 #else
19  m_cutflowSvc("CutFlowSvc/CutFlowSvc",name),
20 #endif
21  m_algstateSvc("AlgExecStateSvc",name)
22 {
23 }
24 
26 {
27 }
28 
31 {
32  // Decode the accept, required and veto Algorithms.
33  // The logic is the following:
34  // a. The event is accepted if all lists are empty.
35  // b. The event is provisionally accepted if any Algorithm in the
36  // accept list
37  // has been executed and has indicated that its filter is passed. This
38  // provisional acceptance can be overridden by the other lists.
39  // c. The event is rejected unless all Algorithms in the required list have
40  // been executed and have indicated that their filter passed.
41  // d. The event is rejected if any Algorithm in the veto list has been
42  // executed and has indicated that its filter has passed.
43 
44  // Set to be listener for BeginRun
45  // Declares top filters to ICutFlowSvc if (and only if) needed (David Cote, Sep 2010)
46 
47  // Get handle to exec state service for retrieving decisions
48  ATH_CHECK( m_algstateSvc.retrieve() );
49 
50  ATH_CHECK(m_evtStore.retrieve());
51 
52  return StatusCode::SUCCESS;
53 }
54 
55 
58 {
59  ATH_MSG_INFO ("Finalized successfully.");
60  return StatusCode::SUCCESS;
61 }
62 
65 {
66  StatusCode status = StatusCode::SUCCESS;
67 
68  // Fill up m_streamNames vector
69  std::set<std::string> uniStreams;
70  m_streamNames.clear();
71  // first take list of streams from accept streams
72  for(auto iter = m_stream_accept.begin();
73  iter != m_stream_accept.end(); ++iter){
74  uniStreams.insert(iter->first);
75  }
76  // then require
77  for(auto iter = m_stream_require.begin();
78  iter != m_stream_require.end(); ++iter){
79  uniStreams.insert(iter->first);
80  }
81  // then veto
82  for(auto iter = m_stream_veto.begin();
83  iter != m_stream_veto.end(); ++iter){
84  uniStreams.insert(iter->first);
85  }
86  std::copy(uniStreams.begin(),uniStreams.end(),std::back_inserter(m_streamNames));
87 
88  return status;
89 }
90 
91 
93 // Non-const methods:
95 
96 
98 DecisionSvc::addStream(const std::string& stream)
99 {
100  StatusCode status = StatusCode::SUCCESS;
101  if(m_frozen != true){
102  // check if this stream already exist
103  auto it = m_stream_accept.find(stream);
104  if(it != m_stream_accept.end()){
105  // ok, it exists, then do nothing
106  ATH_MSG_WARNING("Stream name : " << stream << " already been registered!");
107  status = StatusCode::FAILURE;
108  }else{
109  //if the stream doesn't exist yet, then insert it to the accept list with an empty vector of Algs
110  std::vector<std::string> tmpvec;
111  tmpvec.clear();
112  ATH_MSG_INFO("Inserting stream: "<< stream << " with no Algs");
113  m_stream_accept.insert(std::make_pair(stream, tmpvec));
114  status = StatusCode::SUCCESS;
115  }
116  }
117  return status;
118 }
119 
121 DecisionSvc::fillMap(std::map<std::string, std::vector<std::string> >& streamsModeMap,
122  const std::string& name, const std::string& stream)
123 {
124  // check if this stream already exist
125  std::map<std::string, std::vector<std::string> >::iterator it = streamsModeMap.find(stream);
126  if(it != streamsModeMap.end()){
127  // ok, it exists, then check if the algname was already been inserted
128 
129 
130  bool algexist = false;
131  // Check if alg already registered for this stream
132  for(auto vit = (it->second).begin();
133  vit != (it->second).end(); ++vit) {
134  if((*vit) == name){
135  algexist = true;
136  // it seems the alg was already inserted, warn the user
137  ATH_MSG_ERROR("Alg name : " << name
138  << " of stream " << stream
139  << " has already been registered!");
140  return StatusCode::FAILURE;
141  }
142  }
143 
144 
145  // So, if the stream exist but the alg has not been registered
146  // update its content std::vector with a alg
147  if(algexist == false){
148  std::vector<std::string> &tmpvec = it->second;
149  tmpvec.push_back(name);
150  }
151 
152  //if the stream doesn't exist yet, then insert it
153  } else {
154  std::vector<std::string> tmpvec;
155  tmpvec.push_back(name);
156  streamsModeMap[stream] = std::move(tmpvec);
157  }
158 
159  return StatusCode::SUCCESS;
160 }
161 
163 DecisionSvc::addAcceptAlg(const std::string& name, const std::string& stream)
164 {
165  // Add alg to list of streams
166  auto it = std::find(m_streamNames.begin(), m_streamNames.end(),stream);
167  if (it != m_streamNames.end()) m_streamNames.push_back(stream);
168  // Fill map of stream to alg
170 }
171 
173 DecisionSvc::addRequireAlg(const std::string& name,
174  const std::string& stream)
175 {
176  // Add alg to list of streams
177  auto it = std::find(m_streamNames.begin(), m_streamNames.end(),stream);
178  if (it != m_streamNames.end()) m_streamNames.push_back(stream);
179  // Fill map of stream to alg
181 }
182 
184 DecisionSvc::addVetoAlg(const std::string& name,
185  const std::string& stream)
186 {
187  // Add alg to list of streams
188  auto it = std::find(m_streamNames.begin(), m_streamNames.end(),stream);
189  if (it != m_streamNames.end()) m_streamNames.push_back(stream);
190  // Fill map of stream to alg
192 }
193 
194 // Utility interface to retrieve unique list of streams
195 const std::vector<std::string> DecisionSvc::getStreams() const
196 {
197  return m_streamNames;
198 }
199 
200 // Utility interface
201 const std::vector<std::string>
202 DecisionSvc::getAcceptAlgs(const std::string& stream) const
203 {
204  auto iter = m_stream_accept.find(stream);
205  if (iter != m_stream_accept.end()) return iter->second;
206  return std::vector<std::string>();
207 }
208 
209 // Utility interface
210 const std::vector<std::string>
211 DecisionSvc::getRequireAlgs(const std::string& stream) const
212 {
213  auto iter = m_stream_require.find(stream);
214  if (iter != m_stream_require.end()) return iter->second;
215  return std::vector<std::string>();
216 }
217 
218 // Utility interface
219 const std::vector<std::string>
220 DecisionSvc::getVetoAlgs(const std::string& stream) const
221 {
222  auto iter = m_stream_veto.find(stream);
223  if (iter != m_stream_veto.end()) return iter->second;
224  return std::vector<std::string>();
225 }
226 
227 
228 
230 // Const methods:
232 
233 
234 bool
235 DecisionSvc::isEventAccepted( const std::string& stream ) const
236 {
237  EventContext ec;
238  ec.setValid(false);
239  ec.setSlot(0);
240  return isEventAccepted(stream,ec);
241 }
242 
243 bool
245  const EventContext& ectx) const
246 {
247 
248  ATH_MSG_DEBUG("In DecisionSvc::isEventAccepted( " << stream << " )");
249 
250  // By construction a stream is accepted
251  bool result = true;
252 
253  bool found_accept = false;
254  bool found_require = false;
255  bool found_veto = false;
256 
257  //Loop over all streams of accept type and find the one of interest
258  auto itAlgs = m_stream_accept.find(stream);
259  if(itAlgs != m_stream_accept.end()){
260  found_accept = true;
261  // get a handle of the streams' algos vector
262  const auto &vecAlgs = itAlgs->second;
263  // Loop over all Algorithms in the list to see
264  // whether any have been executed and have their filter
265  // passed flag set. Any match causes the event to be
266  // provisionally accepted.
267  if ( ! vecAlgs.empty( ) ) {
268  result = false;
269  for (auto it = vecAlgs.begin(); it != vecAlgs.end(); it++) {
270  bool isE,fp;
271  isE = (m_algstateSvc->algExecState(*it,ectx).state() == AlgExecState::State::Done);
272  fp = m_algstateSvc->algExecState(*it,ectx).filterPassed();
273  if (isE && fp) {
274  result = true;
275  break;
276  }
277  }
278  }
279  }
280 
281  //Loop over all streams of require type and find the one of interest
282  itAlgs = m_stream_require.find(stream);
283  if(itAlgs != m_stream_require.end()){
284  found_require = true;
285  // get a handle of the streams' algos vector
286  const auto &vecAlgs = itAlgs->second;
287  // Loop over all Algorithms in the list to see
288  // whether any have been executed and have their filter
289  // passed flag set. Any match causes the event to be
290  // provisionally accepted.
291  if ( ! vecAlgs.empty( ) ) {
292  for (auto it = vecAlgs.begin(); it != vecAlgs.end(); it++) {
293  bool isE,fp;
294  isE = (m_algstateSvc->algExecState(*it,ectx).state() == AlgExecState::State::Done);
295  fp = m_algstateSvc->algExecState(*it,ectx).filterPassed();
296  if (!isE || !fp) {
297  result = false;
298  break;
299  }
300  }
301  }
302  }
303 
304  //Loop over all streams of veto type and find the one of interest
305  itAlgs = m_stream_veto.find(stream);
306  if(itAlgs != m_stream_veto.end()){
307  found_veto = true;
308  // get a handle of the streams' algos vector
309  const auto &vecAlgs = itAlgs->second;
310  // Loop over all Algorithms in the list to see
311  // whether any have been executed and have their filter
312  // passed flag set. Any match causes the event to be
313  // provisionally accepted.
314  if ( ! vecAlgs.empty( ) ) {
315  for (auto it = vecAlgs.begin(); it != vecAlgs.end(); it++) {
316  bool isE,fp;
317  isE = (m_algstateSvc->algExecState(*it,ectx).state() == AlgExecState::State::Done);
318  fp = m_algstateSvc->algExecState(*it,ectx).filterPassed();
319  if ( isE && fp ) {
320  result = false;
321  break;
322  }
323  }
324  }
325  }
326 
327  if(found_accept == false && found_require == false && found_veto == false){
328  ATH_MSG_DEBUG("Stream: " << stream << " not found registered in DecisionSvc -- accepting event by default ");
329  }
330 
331  return result;
332 }
333 
335 {
336  ATH_MSG_DEBUG("in start");
337  CHECK( this->interpretAlgMap() );
338  m_frozen = true;
339 
340  // lambda to return true if second element is non-empty
341  auto teststreams = [](const auto& m)
342  {
343  for (const auto& p : m)
344  if (!p.second.empty()) return true;
345  return false;
346  };
347 
348  //Retrieve CutFlowSvc if (and only if) needed
349  if( teststreams(m_stream_accept) || teststreams(m_stream_require) || teststreams(m_stream_veto)) {
350  if (!m_cutflowSvc.empty())
351  {
352  if (m_cutflowSvc.retrieve().isFailure())
353  {
354  ATH_MSG_ERROR("Cannot get ICutFlowSvc interface.");
355  return StatusCode::RECOVERABLE;
356  }
357  }
358  }
359 
360  //Now that everything is said and done, match filters with stream and logic in CutFlowSvc
361  StatusCode sc ATLAS_THREAD_SAFE = DeclareToCutFlowSvc();
362  // ^ FIXME: Should really mark start() as ATLAS_NOT_THREAD_SAFE but that is currently
363  // not possible with the thread-checker.
364 
365  return sc;
366 }
367 
368 StatusCode DecisionSvc::DeclareToCutFlowSvc ATLAS_NOT_THREAD_SAFE ()
369 {
370  // Declare all decisions to CutFlowSvc for bookkeeping
371 
372  // Loop over all streams
373  for(auto ait = m_streamNames.begin();
374  ait != m_streamNames.end(); ++ait) {
375  const std::string &streamName=*ait;
376  const std::vector<std::string> *accFilt=nullptr;
377  const std::vector<std::string> *reqFilt=nullptr;
378  const std::vector<std::string> *vetFilt=nullptr;
379  std::vector< const std::vector<std::string>* > totFilt;
380  if (auto itr = m_stream_accept.find(streamName); itr != m_stream_accept.end()) {
381  accFilt = &itr->second;
382  totFilt.push_back(accFilt);
383  }
384  if (auto itr = m_stream_require.find(streamName); itr != m_stream_require.end()) {
385  reqFilt = &itr->second;
386  totFilt.push_back(reqFilt);
387  }
388  if (auto itr = m_stream_veto.find(streamName); itr != m_stream_veto.end()) {
389  vetFilt = &itr->second;
390  totFilt.push_back(vetFilt);
391  }
392 
393  // Now build logicalKey as string of filt ||, &&, ! based on
394  // whether it is accept, require, veto
395  std::string accstring(""), reqstring(""), vetstring("");
396  if(accFilt){
397  for (auto it = accFilt->begin(); it != accFilt->end(); ++it) {
398  if(!accstring.empty()) accstring += "||";
399  else accstring += '(';
400  accstring+=*it;
401  }
402  }
403  if(reqFilt){
404  for (auto it = reqFilt->begin(); it != reqFilt->end(); ++it) {
405  if(!reqstring.empty()) reqstring += "&&";
406  else reqstring += '(';
407  reqstring+=*it;
408  }
409  }
410  if(vetFilt){
411  for (auto it = vetFilt->begin(); it != vetFilt->end(); ++it) {
412  if(!vetstring.empty()) vetstring += "||";
413  else vetstring += '(';
414  vetstring+=*it;
415  }
416  }
417  std::string logicalKey("");
418  if(!accstring.empty()) {
419  accstring += ')';
420  logicalKey += accstring;
421  }
422  if(!reqstring.empty()) {
423  reqstring += ')';
424  if (!logicalKey.empty()) logicalKey += "&&";
425  logicalKey += reqstring;
426  }
427  if(!vetstring.empty()) {
428  vetstring += ')';
429  if (!logicalKey.empty()) logicalKey += "&&";
430  logicalKey += '!';
431  logicalKey += vetstring;
432  }
433  // If no filters, mark as PasThru
434  if (logicalKey.empty()) logicalKey="PassThru";
435  ATH_MSG_DEBUG("stream " << streamName << " uses logic " << logicalKey);
436 
437  // Now actually declare to the cutflowsvc
438  for(auto vec = totFilt.begin();
439  vec != totFilt.end(); ++vec) {
440  ATH_MSG_DEBUG("Declaring logic " << logicalKey << " for " << streamName);
441  for (auto filter = (*vec)->begin();
442  filter != (*vec)->end(); ++filter) {
443  if(!m_cutflowSvc.empty()) {m_cutflowSvc->registerTopFilter( (*filter), logicalKey, 2, streamName, true ); } // TODO: validate
444  }
445  }
446  }
447 
448  return StatusCode::SUCCESS;
449 }
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
DecisionSvc::m_stream_accept
std::map< std::string, std::vector< std::string > > m_stream_accept
Maps of streams – algorithm names' vectors.
Definition: DecisionSvc.h:89
DecisionSvc::m_streamNames
std::vector< std::string > m_streamNames
Definition: DecisionSvc.h:93
get_generator_info.result
result
Definition: get_generator_info.py:21
python.SystemOfUnits.m
int m
Definition: SystemOfUnits.py:91
python.PerfMonSerializer.p
def p
Definition: PerfMonSerializer.py:743
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
DecisionSvc::getVetoAlgs
virtual const std::vector< std::string > getVetoAlgs(const std::string &stream) const override
Return list of VetoAlg names.
Definition: DecisionSvc.cxx:220
skel.it
it
Definition: skel.GENtoEVGEN.py:423
DecisionSvc::~DecisionSvc
virtual ~DecisionSvc()
Destructor.
Definition: DecisionSvc.cxx:25
DecisionSvc::DecisionSvc
DecisionSvc(const std::string &name, ISvcLocator *pSvcLocator)
Constructor.
Definition: DecisionSvc.cxx:12
vec
std::vector< size_t > vec
Definition: CombinationsGeneratorTest.cxx:12
DecisionSvc::m_stream_veto
std::map< std::string, std::vector< std::string > > m_stream_veto
Definition: DecisionSvc.h:91
DecisionSvc::m_evtStore
ServiceHandle< StoreGateSvc > m_evtStore
Definition: DecisionSvc.h:97
AthenaPoolTestWrite.stream
string stream
Definition: AthenaPoolTestWrite.py:12
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
covarianceTool.filter
filter
Definition: covarianceTool.py:514
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
DecisionSvc::m_frozen
bool m_frozen
Definition: DecisionSvc.h:96
DecisionSvc.h
trigmenu_modify_prescale_json.fp
fp
Definition: trigmenu_modify_prescale_json.py:53
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
DecisionSvc::m_algstateSvc
ServiceHandle< IAlgExecStateSvc > m_algstateSvc
Definition: DecisionSvc.h:99
DecisionSvc::getRequireAlgs
virtual const std::vector< std::string > getRequireAlgs(const std::string &stream) const override
Return list of RequireAlg names.
Definition: DecisionSvc.cxx:211
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
CHECK
#define CHECK(...)
Evaluate an expression and check for errors.
Definition: Control/AthenaKernel/AthenaKernel/errorcheck.h:422
DecisionSvc::initialize
virtual StatusCode initialize() override
Gaudi Service Implementation.
Definition: DecisionSvc.cxx:30
DecisionSvc::addRequireAlg
virtual StatusCode addRequireAlg(const std::string &name, const std::string &stream) override
Add an algorithm to the list of RequireAlgs of a stream.
Definition: DecisionSvc.cxx:173
DecisionSvc::finalize
virtual StatusCode finalize() override
Definition: DecisionSvc.cxx:57
DecisionSvc::isEventAccepted
virtual bool isEventAccepted(const std::string &stream) const override
Test whether this event should be output.
Definition: DecisionSvc.cxx:235
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
DecisionSvc::interpretAlgMap
StatusCode interpretAlgMap()
Definition: DecisionSvc.cxx:64
DecisionSvc::addStream
virtual StatusCode addStream(const std::string &stream) override
Add a stream.
Definition: DecisionSvc.cxx:98
AthenaAttributeList.h
An AttributeList represents a logical row of attributes in a metadata table. The name and type of eac...
AthenaPoolExample_Copy.streamName
string streamName
Definition: AthenaPoolExample_Copy.py:39
DecisionSvc::start
virtual StatusCode start() override
Definition: DecisionSvc.cxx:334
DecisionSvc::addAcceptAlg
virtual StatusCode addAcceptAlg(const std::string &name, const std::string &stream) override
Add an algorithm to the list of AcceptAlgs of a stream.
Definition: DecisionSvc.cxx:163
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
DecisionSvc::m_cutflowSvc
ServiceHandle< ICutFlowSvc > m_cutflowSvc
Definition: DecisionSvc.h:98
DecisionSvc::fillMap
StatusCode fillMap(std::map< std::string, std::vector< std::string > > &streamsModeMap, const std::string &name, const std::string &stream)
Definition: DecisionSvc.cxx:121
DecisionSvc::getStreams
virtual const std::vector< std::string > getStreams() const override
Return list of Streams.
Definition: DecisionSvc.cxx:195
merge.status
status
Definition: merge.py:17
DecisionSvc::addVetoAlg
virtual StatusCode addVetoAlg(const std::string &name, const std::string &stream) override
Add an algorithm to the list of VetoAlgs of a stream.
Definition: DecisionSvc.cxx:184
calibdata.copy
bool copy
Definition: calibdata.py:27
ATLAS_THREAD_SAFE
#define ATLAS_THREAD_SAFE
Definition: checker_macros.h:211
ATLAS_NOT_THREAD_SAFE
StatusCode DecisionSvc::DeclareToCutFlowSvc ATLAS_NOT_THREAD_SAFE()
Install fatal handler with default options.
Definition: DecisionSvc.cxx:368
DecisionSvc::getAcceptAlgs
virtual const std::vector< std::string > getAcceptAlgs(const std::string &stream) const override
Return list of AcceptAlg names.
Definition: DecisionSvc.cxx:202
DecisionSvc::m_stream_require
std::map< std::string, std::vector< std::string > > m_stream_require
Definition: DecisionSvc.h:90