ATLAS Offline Software
RootNtupleOutputStream.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include <cassert>
6 #include <string>
7 #include <vector>
8 #include <fnmatch.h>
9 
10 // Framework include files
11 #include "GaudiKernel/GaudiException.h"
12 #include "GaudiKernel/IAlgManager.h"
13 #include "GaudiKernel/IClassIDSvc.h"
14 #include "GaudiKernel/ISvcLocator.h"
15 #include "GaudiKernel/IOpaqueAddress.h"
16 #include "GaudiKernel/IProperty.h"
17 #include "GaudiKernel/ClassID.h"
18 
21 
22 #include "StoreGate/StoreGateSvc.h"
23 #include "SGTools/DataProxy.h"
24 #include "SGTools/ProxyMap.h"
25 #include "SGTools/SGIFolder.h"
26 
27 #include "RootNtupleOutputStream.h"
28 
29 namespace Athena {
30 
31 // Standard Constructor
32 RootNtupleOutputStream::RootNtupleOutputStream(const std::string& name, ISvcLocator* pSvcLocator)
33  : FilteredAlgorithm(name, pSvcLocator),
34  m_dataStore("StoreGateSvc", name),
35  m_pCLIDSvc("ClassIDSvc", name),
36  m_events(0),
37  m_streamer(std::string("AthenaOutputStreamTool/") +
38  name + std::string("Tool"), this),
39  m_helperTools(this)
40 {
41  assert(pSvcLocator);
42  declareProperty("ItemList", m_itemList);
43  declareProperty("OutputFile", m_outputName="DidNotNameOutput.root");
44  declareProperty("EvtConversionSvc", m_persName="EventPersistencySvc");
45  declareProperty("WritingTool", m_streamer);
46  declareProperty("Store", m_dataStore);
47  declareProperty("ProcessingTag", m_processTag=name);
48  declareProperty("ForceRead", m_forceRead=false);
49  declareProperty("PersToPers", m_persToPers=false);
50  declareProperty("ExemptPersToPers", m_exemptPersToPers);
51  declareProperty("ProvideDef", m_provideDef=false);
52  declareProperty("WriteOnExecute", m_writeOnExecute=true);
53  declareProperty("WriteOnFinalize", m_writeOnFinalize=false);
54  declareProperty("TakeItemsFromInput", m_itemListFromTool=false);
55  declareProperty("HelperTools", m_helperTools);
56 
57  declareProperty("DynamicItemList",
58  m_dynamicItemList = false,
59  "dynamic output itemlist:\n" \
60  " if enabled rediscover object list to be written out at each event\n" \
61  " otherwise: reuse the one from the first event.");
62 }
63 
64 // Standard Destructor
66 {}
67 
68 // initialize data writer
71 {
72  ATH_MSG_DEBUG("In initialize");
73  if (!this->FilteredAlgorithm::initialize().isSuccess()) {
74  ATH_MSG_ERROR("could not initialize base class");
75  return StatusCode::FAILURE;
76  }
77 
78  // Reset the number of events written
79  m_events = 0;
80 
81  // set up the SG service:
82  if (!m_dataStore.retrieve().isSuccess()) {
83  ATH_MSG_FATAL("Could not locate default store");
84  return StatusCode::FAILURE;
85  } else {
86  ATH_MSG_DEBUG("Found " << m_dataStore.type() << " store.");
87  }
88  assert(static_cast<bool>(m_dataStore));
89 
90  // set up the CLID service:
91  if (!m_pCLIDSvc.retrieve().isSuccess()) {
92  ATH_MSG_FATAL("Could not locate default ClassIDSvc");
93  return StatusCode::FAILURE;
94  }
95  assert(static_cast<bool>(m_pCLIDSvc));
96 
97  // Get Output Stream tool for writing
98  if (!m_streamer.retrieve().isSuccess()) {
99  ATH_MSG_FATAL("Can not find " << m_streamer);
100  return StatusCode::FAILURE;
101  }
102 
103  const bool extendProvenanceRecord = true;
104  if (!m_streamer->connectServices(m_dataStore.type(),
105  m_persName,
106  extendProvenanceRecord).isSuccess()) {
107  ATH_MSG_FATAL("Unable to connect services");
108  return StatusCode::FAILURE;
109  }
110 
111  if (!m_helperTools.retrieve().isSuccess()) {
112  ATH_MSG_FATAL("Can not find " << m_helperTools);
113  return StatusCode::FAILURE;
114  }
115  ATH_MSG_INFO("Found " << m_helperTools << endmsg << "Data output: " << m_outputName);
116 
117  bool allgood = true;
118  for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator
119  iter = m_helperTools.begin(),
120  iend = m_helperTools.end();
121  iter != iend;
122  ++iter) {
123  if (!(*iter)->postInitialize().isSuccess()) {
124  allgood = false;
125  }
126  }
127  if (!allgood) {
128  ATH_MSG_ERROR("problem in postInitialize of a helper tool");
129  return StatusCode::FAILURE;
130  }
131 
132  // For 'write on finalize', we set up listener for 'MetaDataStop'
133  // and perform write at this point. This happens at 'stop' of the
134  // event selector. RDS 04/2010
135  if (m_writeOnFinalize) {
136  // Set to be listener for end of event
137  ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", this->name());
138  if (!incSvc.retrieve().isSuccess()) {
139  ATH_MSG_ERROR("Cannot get the IncidentSvc");
140  return StatusCode::FAILURE;
141  } else {
142  ATH_MSG_DEBUG("Retrieved IncidentSvc");
143  }
144  incSvc->addListener(this, "MetaDataStop", 50);
145  ATH_MSG_DEBUG("Added MetaDataStop listener");
146  }
147  ATH_MSG_DEBUG("End initialize");
148  return StatusCode::SUCCESS;
149 }
150 
151 void
152 RootNtupleOutputStream::handle(const Incident& inc)
153 {
154  ATH_MSG_DEBUG("handle() incident type: " << inc.type());
155  static const std::string s_METADATASTOP = "MetaDataStop";
156  if (inc.type() == s_METADATASTOP) {
157  // Moved preFinalize of helper tools to stop - want to optimize the
158  // output file in finalize RDS 12/2009
159  for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator
160  iter = m_helperTools.begin(),
161  iend = m_helperTools.end();
162  iter != iend;
163  ++iter) {
164  if (!(*iter)->preFinalize().isSuccess()) {
165  ATH_MSG_ERROR("Cannot finalize helper tool");
166  }
167  }
168  // Always force a final commit in stop - mainly applies to AthenaPool
169  if (m_writeOnFinalize) {
170  if (!write().isSuccess()) { // true mean write AND commit
171  ATH_MSG_ERROR("Cannot write on finalize");
172  }
173  }
174  // If there are no events, then connect the output for the metadata
175  if (m_events == 0) {
176  if (m_streamer->connectOutput(m_outputName).isFailure()) {
177  ATH_MSG_ERROR("Unable to connect to file " << m_outputName);
178  }
179  }
180  }
181  ATH_MSG_INFO("Records written: " << m_events);
182  ATH_MSG_DEBUG("Leaving handle");
183 }
184 
185 // terminate data writer
186 StatusCode
188 {
189  bool failed = false;
190  ATH_MSG_DEBUG("finalize: Optimize output");
191  if (m_events == 0) {
192  if (!m_streamer->commitOutput().isSuccess()) {
193  failed = true;
194  }
195  }
196  if (!m_streamer->finalizeOutput().isSuccess()) {
197  failed = true;
198  }
199  ATH_MSG_DEBUG("finalize: end optimize output");
200  if (!m_helperTools.release().isSuccess()) {
201  failed = true;
202  }
203  if (!m_streamer.release().isSuccess()) {
204  failed = true;
205  }
206  return failed
207  ? StatusCode::FAILURE
208  : StatusCode::SUCCESS;
209 }
210 
213 {
214  bool failed = false;
215  for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator
216  iter = m_helperTools.begin(),
217  iend = m_helperTools.end();
218  iter != iend;
219  ++iter) {
220  if (!(*iter)->preExecute().isSuccess()) {
221  failed = true;
222  }
223  }
224  if (m_writeOnExecute) {
225  if (!write().isSuccess()) {
226  failed = true;
227  }
228  }
229  for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator
230  iter = m_helperTools.begin(),
231  iend = m_helperTools.end();
232  iter != iend;
233  ++iter) {
234  if(!(*iter)->postExecute().isSuccess()) {
235  failed = true;
236  }
237  }
238 
239  return failed
240  ? StatusCode::FAILURE
241  : StatusCode::SUCCESS;
242 }
243 
244 // Work entry point
247 {
248  bool failed = false;
249  // Clear any previously existing item list
250  clearSelection();
251  // Test whether this event should be output
252  if (isEventAccepted()) {
253  // Connect the output file to the service
254  if (m_streamer->connectOutput(m_outputName).isSuccess()) {
255  // First check if there are any new items in the list
257  StatusCode sc = m_streamer->streamObjects(m_objects);
258  // Do final check of streaming
259  if (!sc.isSuccess()) {
260  if (!sc.isRecoverable()) {
261  ATH_MSG_FATAL("streamObjects failed.");
262  failed = true;
263  } else {
264  ATH_MSG_DEBUG("streamObjects failed.");
265  }
266  }
267  sc = m_streamer->commitOutput();
268  if (!sc.isSuccess()) {
269  ATH_MSG_FATAL("commitOutput failed.");
270  failed = true;
271  }
272  clearSelection();
273  if (!failed) {
274  m_events++;
275  }
276  }
277  }
278 
279  return failed
280  ? StatusCode::FAILURE
281  : StatusCode::SUCCESS;
282 }
283 
284 // Clear collected object list
285 inline
286 void
288 {
289  m_objects.resize(0);
290  if (m_dynamicItemList) {
291  m_selection.resize(0);
292  }
293 }
294 
295 void
297 {
298  typedef std::vector<SG::FolderItem> Items_t;
299 
300  if (!m_dynamicItemList && !m_selection.empty()) {
301  // reuse object list from previous event.
302  } else {
303 
304  // if (m_itemListFromTool) {
305  // //FIXME:
306  // // if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
307  // // ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
308  // // }
309  // }
310 
311  static const std::string s_plus = "+";
312  static const std::string s_dash = "-";
313  typedef std::vector<const SG::DataProxy*> Proxies_t;
314  Proxies_t proxies = m_dataStore->proxies();
315 
316  const std::vector<std::string>& items = m_itemList.value();
317  std::vector<std::string> toremove;
318  toremove.reserve(items.size());
319 
320  Items_t selection;
321  selection.reserve(items.size());
322 
323  for (Proxies_t::const_iterator
324  iproxy = proxies.begin(),
325  iend = proxies.end();
326  iproxy != iend;
327  ++iproxy) {
328  const SG::DataProxy *proxy = *iproxy;
329  if (!proxy) {
330  continue;
331  }
332  for (std::vector<std::string>::const_iterator
333  jkey = items.begin(),
334  jend = items.end();
335  jkey != jend;
336  ++jkey) {
337  if (!jkey->empty()) {
338  if ((*jkey)[0] == s_dash[0]) {
339  toremove.push_back(jkey->substr(1, std::string::npos));
340  continue;
341  }
342  std::string key = *jkey;
343  if ((*jkey)[0] == s_plus[0]) {
344  key = jkey->substr(1, std::string::npos);
345  }
346  int o = fnmatch(key.c_str(),
347  proxy->name().c_str(),
348  FNM_PATHNAME);
349  if (o == 0) {
350  // accept
351  selection.push_back(SG::FolderItem(proxy->clID(), proxy->name()));
352  break;
353  }
354  }
355  }
356  }
357 
358  m_selection.reserve(selection.size());
359  if (toremove.empty()) {
361  } else {
362  for(Items_t::const_iterator
363  isel=selection.begin(),
364  iend=selection.end();
365  isel != iend;
366  ++isel) {
367  const std::string &name = isel->key();
368  bool keep = true;
369  for (std::vector<std::string>::const_iterator
370  jkey = toremove.begin(),
371  jend = toremove.end();
372  jkey != jend;
373  ++jkey) {
374  const std::string& key = *jkey;
375  int o = fnmatch(key.c_str(),
376  name.c_str(),
377  FNM_PATHNAME);
378  if (o == 0) {
379  // reject
380  keep = false;
381  break;
382  }
383  }
384  if (keep) {
385  m_selection.push_back(*isel);
386  }
387  }
388  }
389  }
390 
391  for (Items_t::const_iterator
392  itr = m_selection.begin(),
393  iend = m_selection.end();
394  itr != iend;
395  ++itr) {
396  const std::string &name = itr->key();
397  SG::DataProxy *proxy = m_dataStore->proxy(itr->id(), name);
398  if (NULL == proxy) {
399  continue;
400  }
401  if (m_forceRead && proxy->isValid()) {
402  if (!m_persToPers) {
403  if (NULL == proxy->accessData()) {
404  ATH_MSG_ERROR(" Could not get data object for id "
405  << proxy->clID() << ",\"" << proxy->name());
406  }
407  } else if (true /*m_exemptPersToPers.find(item.id()) != m_exemptPersToPers.end()*/) {
408  if (NULL == proxy->accessData()) {
409  ATH_MSG_ERROR(" Could not get data object for id "
410  << proxy->clID() << ",\"" << proxy->name());
411  }
412  }
413  }
414  DataObject *obj = proxy->object();
415  if (NULL != obj) {
416  m_objects.push_back(obj);
417  ATH_MSG_DEBUG(" Added object " << proxy->clID() << ",\"" << proxy->name() << "\"");
418  // if (m_checkNumberOfWrites && !m_provideDef) {
419  // std::string tn;
420  // if (!m_pCLIDSvc->getTypeNameOfID(item.id(), tn).isSuccess()) {
421  // ATH_MSG_ERROR(" Could not get type name for id "
422  // << item.id() << ",\"" << itemProxy->name());
423  // } else {
424  // tn += '_' + itemProxy->name();
425  // CounterMapType::iterator cit = m_objectWriteCounter.find(tn);
426  // if (cit == m_objectWriteCounter.end()) {
427  // // First time through
428  // //std::pair<CounterMapType::iterator, bool> result =
429  // m_objectWriteCounter.insert(CounterMapType::value_type(tn, 1));
430  // } else {
431  // // set to next iteration (to avoid double counting)
432  // // StreamTools will eliminate duplicates.
433  // (*cit).second = m_events + 1;
434  // }
435  // }
436  // }
437  // } else if (!m_forceRead && m_persToPers && proxy->isValid()) {
438  // tAddr = itemProxy->transientAddress();
439  } //if data object there
440  }
441 
442  return;
443 }
444 
445 } //> ns Athena
446 
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
Athena::RootNtupleOutputStream::m_streamer
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
Definition: RootNtupleOutputStream.h:101
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
SGIFolder.h
StateLessPT_NewConfig.proxy
proxy
Definition: StateLessPT_NewConfig.py:392
FilteredAlgorithm
algorithm that marks for write data objects in SG
Definition: FilteredAlgorithm.h:33
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
Athena::RootNtupleOutputStream::m_writeOnFinalize
bool m_writeOnFinalize
set to true to trigger streaming of data on finalize()
Definition: RootNtupleOutputStream.h:88
Athena::RootNtupleOutputStream::collectAllObjects
void collectAllObjects()
Collect data objects for output streamer list.
Definition: RootNtupleOutputStream.cxx:296
IAthenaOutputTool.h
This file contains the class definition for the IAthenaOutputTool class.
AthCommonDataStore< AthCommonMsg< Algorithm > >::declareProperty
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T > &t)
Definition: AthCommonDataStore.h:145
Athena::RootNtupleOutputStream::m_exemptPersToPers
std::vector< unsigned int > m_exemptPersToPers
Definition: RootNtupleOutputStream.h:82
FilteredAlgorithm::initialize
virtual StatusCode initialize()
Definition: FilteredAlgorithm.cxx:46
Athena::RootNtupleOutputStream::m_writeOnExecute
bool m_writeOnExecute
set to true to trigger streaming of data on execute()
Definition: RootNtupleOutputStream.h:86
Athena::RootNtupleOutputStream::m_events
int m_events
Number of events written to this output stream.
Definition: RootNtupleOutputStream.h:77
Athena::RootNtupleOutputStream::m_dynamicItemList
bool m_dynamicItemList
dynamic output itemlist: if enabled rediscover object list to be written out at each event otherwise:...
Definition: RootNtupleOutputStream.h:95
ProxyMap.h
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
Athena::RootNtupleOutputStream::clearSelection
void clearSelection()
Clear list of selected objects.
Definition: RootNtupleOutputStream.cxx:287
FilteredAlgorithm::isEventAccepted
bool isEventAccepted() const
Test whether this event should be output.
Definition: FilteredAlgorithm.cxx:128
IAthenaOutputStreamTool.h
Interface to an output stream tool.
Athena::RootNtupleOutputStream::m_objects
IDataSelector m_objects
Collection of objects beeing selected.
Definition: RootNtupleOutputStream.h:75
Athena::RootNtupleOutputStream::handle
void handle(const Incident &incident)
Incident service handle listening for MetaDataStop.
Definition: RootNtupleOutputStream.cxx:152
Athena::RootNtupleOutputStream::execute
virtual StatusCode execute()
Definition: RootNtupleOutputStream.cxx:212
Athena::RootNtupleOutputStream::finalize
virtual StatusCode finalize()
Definition: RootNtupleOutputStream.cxx:187
Athena::RootNtupleOutputStream::m_itemList
StringArrayProperty m_itemList
Vector of item names.
Definition: RootNtupleOutputStream.h:73
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
Athena::RootNtupleOutputStream::m_forceRead
bool m_forceRead
set to true to force read of data objects in item list
Definition: RootNtupleOutputStream.h:79
ReadCalibFromCool.keep
keep
Definition: ReadCalibFromCool.py:85
Athena
Some weak symbol referencing magic...
Definition: AthLegacySequence.h:21
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
Athena::RootNtupleOutputStream::m_persName
std::string m_persName
Name of the persistency service capable to write data from the store.
Definition: RootNtupleOutputStream.h:61
Athena::RootNtupleOutputStream::write
virtual StatusCode write()
Stream the data.
Definition: RootNtupleOutputStream.cxx:246
Athena::RootNtupleOutputStream::RootNtupleOutputStream
RootNtupleOutputStream(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
Definition: RootNtupleOutputStream.cxx:32
selection
std::string selection
Definition: fbtTestBasics.cxx:73
Athena::RootNtupleOutputStream::m_itemListFromTool
bool m_itemListFromTool
set to write out everything from input DataHeader
Definition: RootNtupleOutputStream.h:90
Athena::RootNtupleOutputStream::m_persToPers
bool m_persToPers
set to true to allow data objects being copied persistent to persistent (without SG retrieve).
Definition: RootNtupleOutputStream.h:81
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
Athena::RootNtupleOutputStream::~RootNtupleOutputStream
virtual ~RootNtupleOutputStream()
Standard Destructor.
Definition: RootNtupleOutputStream.cxx:65
Athena::RootNtupleOutputStream::m_dataStore
ServiceHandle< StoreGateSvc > m_dataStore
handle to the StoreGateSvc store where the data we want to write out resides
Definition: RootNtupleOutputStream.h:58
Athena::RootNtupleOutputStream::m_helperTools
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
Definition: RootNtupleOutputStream.h:103
Athena::RootNtupleOutputStream::m_selection
std::vector< SG::FolderItem > m_selection
list of selected proxies.
Definition: RootNtupleOutputStream.h:98
Athena::RootNtupleOutputStream::initialize
virtual StatusCode initialize()
Definition: RootNtupleOutputStream.cxx:70
RootNtupleOutputStream.h
Athena::RootNtupleOutputStream::m_pCLIDSvc
IClassIDSvc_t m_pCLIDSvc
Definition: RootNtupleOutputStream.h:70
Athena::RootNtupleOutputStream::m_processTag
StringProperty m_processTag
tag of processing stage:
Definition: RootNtupleOutputStream.h:67
Athena::RootNtupleOutputStream::m_provideDef
bool m_provideDef
set to true to allow defaults being provided for non-existent data objects.
Definition: RootNtupleOutputStream.h:84
Athena::RootNtupleOutputStream::m_outputName
std::string m_outputName
Name of the output file.
Definition: RootNtupleOutputStream.h:65
python.PyAthena.obj
obj
Definition: PyAthena.py:135
SG::DataProxy
Definition: DataProxy.h:44
StoreGateSvc.h
SG::FolderItem
a Folder item (data object) is identified by the clid/key pair
Definition: SGFolderItem.h:24
physval_make_web_display.failed
bool failed
Definition: physval_make_web_display.py:290
ServiceHandle< IIncidentSvc >
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
DataProxy.h