ATLAS Offline Software
Loading...
Searching...
No Matches
RootNtupleOutputStream.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5#include <cassert>
6#include <string>
7#include <vector>
8#include <fnmatch.h>
9
10// Framework include files
11#include "GaudiKernel/GaudiException.h"
12#include "GaudiKernel/IAlgManager.h"
13#include "GaudiKernel/IClassIDSvc.h"
14#include "GaudiKernel/ISvcLocator.h"
15#include "GaudiKernel/IOpaqueAddress.h"
16#include "GaudiKernel/IProperty.h"
17#include "GaudiKernel/ClassID.h"
18
21
23#include "SGTools/DataProxy.h"
24#include "SGTools/ProxyMap.h"
25#include "SGTools/SGIFolder.h"
26
28
29namespace Athena {
30
31// Standard Constructor
32RootNtupleOutputStream::RootNtupleOutputStream(const std::string& name, ISvcLocator* pSvcLocator)
33 : FilteredAlgorithm(name, pSvcLocator),
34 m_dataStore("StoreGateSvc", name),
35 m_pCLIDSvc("ClassIDSvc", name),
36 m_events(0),
37 m_streamer(std::string("AthenaOutputStreamTool/") +
38 name + std::string("Tool"), this),
39 m_helperTools(this)
40{
41 assert(pSvcLocator);
42 declareProperty("ItemList", m_itemList);
43 declareProperty("OutputFile", m_outputName="DidNotNameOutput.root");
44 declareProperty("EvtConversionSvc", m_persName="EventPersistencySvc");
45 declareProperty("WritingTool", m_streamer);
47 declareProperty("ProcessingTag", m_processTag=name);
48 declareProperty("ForceRead", m_forceRead=false);
49 declareProperty("PersToPers", m_persToPers=false);
50 declareProperty("ExemptPersToPers", m_exemptPersToPers);
51 declareProperty("ProvideDef", m_provideDef=false);
52 declareProperty("WriteOnExecute", m_writeOnExecute=true);
53 declareProperty("WriteOnFinalize", m_writeOnFinalize=false);
54 declareProperty("TakeItemsFromInput", m_itemListFromTool=false);
55 declareProperty("HelperTools", m_helperTools);
56
57 declareProperty("DynamicItemList",
58 m_dynamicItemList = false,
59 "dynamic output itemlist:\n" \
60 " if enabled rediscover object list to be written out at each event\n" \
61 " otherwise: reuse the one from the first event.");
62}
63
64// Standard Destructor
67
68// initialize data writer
69StatusCode
71{
72 ATH_MSG_DEBUG("In initialize");
73 if (!this->FilteredAlgorithm::initialize().isSuccess()) {
74 ATH_MSG_ERROR("could not initialize base class");
75 return StatusCode::FAILURE;
76 }
77
78 // Reset the number of events written
79 m_events = 0;
80
81 // set up the SG service:
82 if (!m_dataStore.retrieve().isSuccess()) {
83 ATH_MSG_FATAL("Could not locate default store");
84 return StatusCode::FAILURE;
85 } else {
86 ATH_MSG_DEBUG("Found " << m_dataStore.type() << " store.");
87 }
88 assert(static_cast<bool>(m_dataStore));
89
90 // set up the CLID service:
91 if (!m_pCLIDSvc.retrieve().isSuccess()) {
92 ATH_MSG_FATAL("Could not locate default ClassIDSvc");
93 return StatusCode::FAILURE;
94 }
95 assert(static_cast<bool>(m_pCLIDSvc));
96
97 // Get Output Stream tool for writing
98 if (!m_streamer.retrieve().isSuccess()) {
99 ATH_MSG_FATAL("Can not find " << m_streamer);
100 return StatusCode::FAILURE;
101 }
102
103 const bool extendProvenanceRecord = true;
104 if (!m_streamer->connectServices(m_dataStore.type(),
105 m_persName,
106 extendProvenanceRecord).isSuccess()) {
107 ATH_MSG_FATAL("Unable to connect services");
108 return StatusCode::FAILURE;
109 }
110
111 if (!m_helperTools.retrieve().isSuccess()) {
112 ATH_MSG_FATAL("Can not find " << m_helperTools);
113 return StatusCode::FAILURE;
114 }
115 ATH_MSG_INFO("Found " << m_helperTools << endmsg << "Data output: " << m_outputName);
116
117 bool allgood = true;
118 for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator
119 iter = m_helperTools.begin(),
120 iend = m_helperTools.end();
121 iter != iend;
122 ++iter) {
123 if (!(*iter)->postInitialize().isSuccess()) {
124 allgood = false;
125 }
126 }
127 if (!allgood) {
128 ATH_MSG_ERROR("problem in postInitialize of a helper tool");
129 return StatusCode::FAILURE;
130 }
131
132 // For 'write on finalize', we set up listener for 'MetaDataStop'
133 // and perform write at this point. This happens at 'stop' of the
134 // event selector. RDS 04/2010
135 if (m_writeOnFinalize) {
136 // Set to be listener for end of event
137 ServiceHandle<IIncidentSvc> incSvc("IncidentSvc", this->name());
138 if (!incSvc.retrieve().isSuccess()) {
139 ATH_MSG_ERROR("Cannot get the IncidentSvc");
140 return StatusCode::FAILURE;
141 } else {
142 ATH_MSG_DEBUG("Retrieved IncidentSvc");
143 }
144 incSvc->addListener(this, "MetaDataStop", 50);
145 ATH_MSG_DEBUG("Added MetaDataStop listener");
146 }
147 ATH_MSG_DEBUG("End initialize");
148 return StatusCode::SUCCESS;
149}
150
151void
153{
154 ATH_MSG_DEBUG("handle() incident type: " << inc.type());
155 static const std::string s_METADATASTOP = "MetaDataStop";
156 if (inc.type() == s_METADATASTOP) {
157 // Moved preFinalize of helper tools to stop - want to optimize the
158 // output file in finalize RDS 12/2009
159 for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator
160 iter = m_helperTools.begin(),
161 iend = m_helperTools.end();
162 iter != iend;
163 ++iter) {
164 if (!(*iter)->preFinalize().isSuccess()) {
165 ATH_MSG_ERROR("Cannot finalize helper tool");
166 }
167 }
168 // Always force a final commit in stop - mainly applies to AthenaPool
169 if (m_writeOnFinalize) {
170 if (!write().isSuccess()) { // true mean write AND commit
171 ATH_MSG_ERROR("Cannot write on finalize");
172 }
173 }
174 // If there are no events, then connect the output for the metadata
175 if (m_events == 0) {
176 if (m_streamer->connectOutput(m_outputName).isFailure()) {
177 ATH_MSG_ERROR("Unable to connect to file " << m_outputName);
178 }
179 }
180 }
181 ATH_MSG_INFO("Records written: " << m_events);
182 ATH_MSG_DEBUG("Leaving handle");
183}
184
185// terminate data writer
186StatusCode
188{
189 bool failed = false;
190 ATH_MSG_DEBUG("finalize: Optimize output");
191 if (m_events == 0) {
192 if (!m_streamer->commitOutput().isSuccess()) {
193 failed = true;
194 }
195 }
196 if (!m_streamer->finalizeOutput().isSuccess()) {
197 failed = true;
198 }
199 ATH_MSG_DEBUG("finalize: end optimize output");
200 if (!m_helperTools.release().isSuccess()) {
201 failed = true;
202 }
203 if (!m_streamer.release().isSuccess()) {
204 failed = true;
205 }
206 return failed
207 ? StatusCode::FAILURE
208 : StatusCode::SUCCESS;
209}
210
211StatusCode
213{
214 bool failed = false;
215 for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator
216 iter = m_helperTools.begin(),
217 iend = m_helperTools.end();
218 iter != iend;
219 ++iter) {
220 if (!(*iter)->preExecute().isSuccess()) {
221 failed = true;
222 }
223 }
224 if (m_writeOnExecute) {
225 if (!write().isSuccess()) {
226 failed = true;
227 }
228 }
229 for (std::vector<ToolHandle<IAthenaOutputTool> >::iterator
230 iter = m_helperTools.begin(),
231 iend = m_helperTools.end();
232 iter != iend;
233 ++iter) {
234 if(!(*iter)->postExecute().isSuccess()) {
235 failed = true;
236 }
237 }
238
239 return failed
240 ? StatusCode::FAILURE
241 : StatusCode::SUCCESS;
242}
243
244// Work entry point
245StatusCode
247{
248 bool failed = false;
249 // Clear any previously existing item list
251 // Test whether this event should be output
252 if (isEventAccepted()) {
253 // Connect the output file to the service
254 if (m_streamer->connectOutput(m_outputName).isSuccess()) {
255 // First check if there are any new items in the list
257 StatusCode sc = m_streamer->streamObjects(m_objects);
258 // Do final check of streaming
259 if (!sc.isSuccess()) {
260 if (!sc.isRecoverable()) {
261 ATH_MSG_FATAL("streamObjects failed.");
262 failed = true;
263 } else {
264 ATH_MSG_DEBUG("streamObjects failed.");
265 }
266 }
267 sc = m_streamer->commitOutput();
268 if (!sc.isSuccess()) {
269 ATH_MSG_FATAL("commitOutput failed.");
270 failed = true;
271 }
273 if (!failed) {
274 m_events++;
275 }
276 }
277 }
278
279 return failed
280 ? StatusCode::FAILURE
281 : StatusCode::SUCCESS;
282}
283
284// Clear collected object list
285inline
286void
288{
289 m_objects.resize(0);
290 if (m_dynamicItemList) {
291 m_selection.resize(0);
292 }
293}
294
295void
297{
298 typedef std::vector<SG::FolderItem> Items_t;
299
300 if (!m_dynamicItemList && !m_selection.empty()) {
301 // reuse object list from previous event.
302 } else {
303
304 // if (m_itemListFromTool) {
305 // //FIXME:
306 // // if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
307 // // ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
308 // // }
309 // }
310
311 static const std::string s_plus = "+";
312 static const std::string s_dash = "-";
313 typedef std::vector<const SG::DataProxy*> Proxies_t;
314 Proxies_t proxies = m_dataStore->proxies();
315
316 const std::vector<std::string>& items = m_itemList.value();
317 std::vector<std::string> toremove;
318 toremove.reserve(items.size());
319
320 Items_t selection;
321 selection.reserve(items.size());
322
323 for (Proxies_t::const_iterator
324 iproxy = proxies.begin(),
325 iend = proxies.end();
326 iproxy != iend;
327 ++iproxy) {
328 const SG::DataProxy *proxy = *iproxy;
329 if (!proxy) {
330 continue;
331 }
332 for (std::vector<std::string>::const_iterator
333 jkey = items.begin(),
334 jend = items.end();
335 jkey != jend;
336 ++jkey) {
337 if (!jkey->empty()) {
338 if ((*jkey)[0] == s_dash[0]) {
339 toremove.push_back(jkey->substr(1, std::string::npos));
340 continue;
341 }
342 std::string key = *jkey;
343 if ((*jkey)[0] == s_plus[0]) {
344 key = jkey->substr(1, std::string::npos);
345 }
346 int o = fnmatch(key.c_str(),
347 proxy->name().c_str(),
348 FNM_PATHNAME);
349 if (o == 0) {
350 // accept
351 selection.push_back(SG::FolderItem(proxy->clID(), proxy->name()));
352 break;
353 }
354 }
355 }
356 }
357
358 m_selection.reserve(selection.size());
359 if (toremove.empty()) {
360 m_selection = std::move(selection);
361 } else {
362 for(Items_t::const_iterator
363 isel=selection.begin(),
364 iend=selection.end();
365 isel != iend;
366 ++isel) {
367 const std::string &name = isel->key();
368 bool keep = true;
369 for (std::vector<std::string>::const_iterator
370 jkey = toremove.begin(),
371 jend = toremove.end();
372 jkey != jend;
373 ++jkey) {
374 const std::string& key = *jkey;
375 int o = fnmatch(key.c_str(),
376 name.c_str(),
377 FNM_PATHNAME);
378 if (o == 0) {
379 // reject
380 keep = false;
381 break;
382 }
383 }
384 if (keep) {
385 m_selection.push_back(*isel);
386 }
387 }
388 }
389 }
390
391 for (Items_t::const_iterator
392 itr = m_selection.begin(),
393 iend = m_selection.end();
394 itr != iend;
395 ++itr) {
396 const std::string &name = itr->key();
397 SG::DataProxy *proxy = m_dataStore->proxy(itr->id(), name);
398 if (NULL == proxy) {
399 continue;
400 }
401 if (m_forceRead && proxy->isValid()) {
402 if (!m_persToPers) {
403 if (NULL == proxy->accessData()) {
404 ATH_MSG_ERROR(" Could not get data object for id "
405 << proxy->clID() << ",\"" << proxy->name());
406 }
407 } else if (true /*m_exemptPersToPers.find(item.id()) != m_exemptPersToPers.end()*/) {
408 if (NULL == proxy->accessData()) {
409 ATH_MSG_ERROR(" Could not get data object for id "
410 << proxy->clID() << ",\"" << proxy->name());
411 }
412 }
413 }
414 DataObject *obj = proxy->object();
415 if (NULL != obj) {
416 m_objects.push_back(obj);
417 ATH_MSG_DEBUG(" Added object " << proxy->clID() << ",\"" << proxy->name() << "\"");
418 // if (m_checkNumberOfWrites && !m_provideDef) {
419 // std::string tn;
420 // if (!m_pCLIDSvc->getTypeNameOfID(item.id(), tn).isSuccess()) {
421 // ATH_MSG_ERROR(" Could not get type name for id "
422 // << item.id() << ",\"" << itemProxy->name());
423 // } else {
424 // tn += '_' + itemProxy->name();
425 // CounterMapType::iterator cit = m_objectWriteCounter.find(tn);
426 // if (cit == m_objectWriteCounter.end()) {
427 // // First time through
428 // //std::pair<CounterMapType::iterator, bool> result =
429 // m_objectWriteCounter.insert(CounterMapType::value_type(tn, 1));
430 // } else {
431 // // set to next iteration (to avoid double counting)
432 // // StreamTools will eliminate duplicates.
433 // (*cit).second = m_events + 1;
434 // }
435 // }
436 // }
437 // } else if (!m_forceRead && m_persToPers && proxy->isValid()) {
438 // tAddr = itemProxy->transientAddress();
439 } //if data object there
440 }
441
442 return;
443}
444
445} //> ns Athena
446
#define endmsg
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_DEBUG(x)
Interface to an output stream tool.
This file contains the class definition for the IAthenaOutputTool class.
static Double_t sc
Gaudi::Details::PropertyBase & declareProperty(Gaudi::Property< T, V, H > &t)
std::vector< SG::FolderItem > m_selection
list of selected proxies.
void clearSelection()
Clear list of selected objects.
bool m_dynamicItemList
dynamic output itemlist: if enabled rediscover object list to be written out at each event otherwise:...
virtual StatusCode write()
Stream the data.
bool m_provideDef
set to true to allow defaults being provided for non-existent data objects.
bool m_itemListFromTool
set to write out everything from input DataHeader
ServiceHandle< StoreGateSvc > m_dataStore
handle to the StoreGateSvc store where the data we want to write out resides
void collectAllObjects()
Collect data objects for output streamer list.
void handle(const Incident &incident)
Incident service handle listening for MetaDataStop.
IDataSelector m_objects
Collection of objects beeing selected.
bool m_persToPers
set to true to allow data objects being copied persistent to persistent (without SG retrieve).
StringArrayProperty m_itemList
Vector of item names.
virtual ~RootNtupleOutputStream()
Standard Destructor.
RootNtupleOutputStream(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
bool m_forceRead
set to true to force read of data objects in item list
bool m_writeOnExecute
set to true to trigger streaming of data on execute()
std::string m_persName
Name of the persistency service capable to write data from the store.
StringProperty m_processTag
tag of processing stage:
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
bool m_writeOnFinalize
set to true to trigger streaming of data on finalize()
std::string m_outputName
Name of the output file.
int m_events
Number of events written to this output stream.
std::vector< unsigned int > m_exemptPersToPers
bool isEventAccepted() const
Test whether this event should be output.
virtual StatusCode initialize()
FilteredAlgorithm(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
a Folder item (data object) is identified by the clid/key pair
STL class.
const std::string selection
Some weak symbol referencing magic... These are declared in AthenaKernel/getMessageSvc....
STL namespace.