ATLAS Offline Software
AthenaOutputStream.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "AthenaOutputStream.h"
6 
7 // STL include files
8 #include <cassert>
9 #include <format>
10 #include <sstream>
11 
12 // Framework include files
18 #include "GaudiKernel/AlgTool.h"
19 #include "GaudiKernel/ClassID.h"
20 #include "GaudiKernel/GaudiException.h"
21 #include "GaudiKernel/IAlgManager.h"
22 #include "GaudiKernel/IIoComponentMgr.h"
23 #include "GaudiKernel/IOpaqueAddress.h"
24 #include "GaudiKernel/IProperty.h"
25 #include "GaudiKernel/ISvcLocator.h"
26 #include "GaudiKernel/MsgStream.h"
29 #include "SGTools/DataProxy.h"
30 #include "SGTools/ProxyMap.h"
31 #include "SGTools/SGIFolder.h"
33 #include "SGTools/transientKey.h"
34 #include "StoreGate/StoreGateSvc.h"
35 #include "StoreGate/WriteHandle.h"
37 #include "xAODCore/AuxSelection.h"
38 
39 // Local include files
40 #include "AltDataBucket.h"
41 
42 // Standard Constructor
43 AthenaOutputStream::AthenaOutputStream(const std::string& name, ISvcLocator* pSvcLocator)
44  : base_class(name, pSvcLocator),
45  m_currentStore(&m_dataStore),
46  m_p2BWritten(std::format("SG::Folder/{}_TopFolder", name), this),
47  m_compressionDecoderHigh(std::format("SG::Folder/{}_compressed_high", name), this),
48  m_compressionDecoderLow(std::format("SG::Folder/{}_compressed_low", name), this),
49  m_transient(std::format("SG::Folder/{}_transient", name), this),
50  m_streamer(std::format("AthenaOutputStreamTool/{}Tool", name), this)
51 {
52  // Ensure the service locater is good
53  assert(pSvcLocator);
54 
55  // This property depends on the name that's known at construction time
56  // Therefore, do it the old fashioned way
57  declareProperty("WritingTool", m_streamer);
58 
59  // Associate action handlers with the AcceptAlgs,
60  // RequireAlgs & VetoAlgs properties
61  m_itemList.declareUpdateHandler(&AthenaOutputStream::itemListHandler, this);
64 }
65 
66 // Standard Destructor
68  // Clear the internal caches
69  m_streamerMap.clear();
70 }
71 
72 // Initialize data writer
74  ATH_MSG_DEBUG("In initialize");
75 
76  // Initialize the FilteredAlgorithm base
78 
79  // Reset the number of events written
80  m_events = 0;
81 
82  // Set up the SG services
83  ATH_CHECK( m_dataStore.retrieve() );
84  ATH_MSG_DEBUG(std::format("Found {} store.", m_dataStore.typeAndName()));
85  if (!m_metadataItemList.value().empty()) {
86  ATH_CHECK( m_metadataStore.retrieve() );
87  ATH_MSG_DEBUG(std::format("Found {} store.", m_metadataStore.typeAndName()));
88  }
89 
90  // Set up various services
91  ATH_CHECK( m_pCLIDSvc.retrieve() );
92  ATH_CHECK( m_dictLoader.retrieve() );
93  ATH_CHECK( m_tpCnvSvc.retrieve() );
94  ATH_CHECK( m_itemSvc.retrieve() );
95  ATH_CHECK( m_outSeqSvc.retrieve() );
96 
97  // Get Output Stream tool for writing
98  ATH_CHECK( m_streamer.retrieve() );
99  ATH_CHECK( m_streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord) );
100 
101  ATH_CHECK( m_helperTools.retrieve() );
102  ATH_MSG_INFO("Found " << m_helperTools);
103  ATH_MSG_INFO(std::format("Data output: {}", m_outputName.toString()));
104 
105  for (auto& tool : m_helperTools) {
106  ATH_CHECK( tool->postInitialize() );
107  }
108 
109  // Register this algorithm for 'I/O' events
110  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
111  ATH_CHECK( iomgr.retrieve() );
112  ATH_CHECK( iomgr->io_register(this) );
113  ATH_CHECK( iomgr->io_register(this, IIoComponentMgr::IoMode::WRITE, m_outputName) );
114  ATH_CHECK( this->io_reinit() );
115 
116  // Add an explicit input dependency for everything in our item list
117  // that we know from the configuration is in the transient store.
118  // We don't want to add everything on the list, because configurations
119  // often initialize this with a maximal static list of everything
120  // that could possibly be written.
121  {
122  ATH_CHECK( m_transient.retrieve() );
123  IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
124  if (!pAsIProp) {
125  ATH_MSG_FATAL ("Bad folder interface");
126  return StatusCode::FAILURE;
127  }
128  ATH_CHECK (pAsIProp->setProperty("ItemList", m_transientItems.toString()));
129 
130  for (const SG::FolderItem& item : *m_p2BWritten) {
131  // Load ROOT dictionaries now.
132  loadDict (item.id());
133 
134  const std::string& k = item.key();
135  if (k.find('*') != std::string::npos) continue;
136  if (k.find('.') != std::string::npos) continue;
137  for (const SG::FolderItem& titem : *m_transient) {
138  if (titem.id() == item.id() && titem.key() == k) {
139  DataObjID id (item.id(), std::format("{}+{}", m_dataStore.name(), k));
140  this->addDependency (id, Gaudi::DataHandle::Reader);
141  break;
142  }
143  }
144  }
145  m_transient->clear();
146  }
147 
148  // Also load dictionaries for metadata classes.
149  if (!m_metadataItemList.value().empty()) {
150  IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
151  if (!pAsIProp) {
152  ATH_MSG_FATAL ("Bad folder interface");
153  return StatusCode::FAILURE;
154  }
155  ATH_CHECK (pAsIProp->setProperty("ItemList", m_metadataItemList.toString()));
156  for (const SG::FolderItem& item : *m_transient) {
157  loadDict (item.id());
158  }
159  m_transient->clear();
160  }
161 
162  // Also make sure we have the dictionary for Token.
163  m_dictLoader->load_type ("Token");
164 
165  // Listen to event range incidents if incident name is configured
166  ATH_CHECK( m_incidentSvc.retrieve() );
167  if( !m_outSeqSvc->incidentName().empty() ) {
168  // use priority 95 to make sure the Output Sequencer goes first (it has priority 100)
169  m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 95);
170  m_incidentSvc->addListener(this, IncidentType::EndProcessing, 95);
171  }
172 
173  // Check compression settings and print some information about the configuration
174  // Both should be between [5, 23] and high compression should be < low compression
175  if(m_compressionBitsHigh < 5 || m_compressionBitsHigh > 23) {
176  ATH_MSG_INFO(std::format("Float compression mantissa bits for high compression "
177  "({}) is outside the allowed range of [5, 23].",
178  m_compressionBitsHigh.toString()));
179  ATH_MSG_INFO("Setting it to the appropriate limit.");
181  }
182  if(m_compressionBitsLow < 5 || m_compressionBitsLow > 23) {
183  ATH_MSG_INFO(std::format("Float compression mantissa bits for low compression "
184  "({}) is outside the allowed range of [5, 23].",
185  m_compressionBitsLow.toString()));
186  ATH_MSG_INFO("Setting it to the appropriate limit.");
188  }
190  ATH_MSG_ERROR(std::format("Float compression mantissa bits for low compression "
191  "({}) is lower than or equal to high compression "
192  "({})! Please check the configuration! ",
193  m_compressionBitsLow.toString(),
194  m_compressionBitsHigh.toString()));
195  return StatusCode::FAILURE;
196  }
197  if(m_compressionListHigh.value().empty() && m_compressionListLow.value().empty()) {
198  ATH_MSG_VERBOSE("Both high and low float compression lists are empty. Float compression will NOT be applied.");
199  } else {
200  ATH_MSG_INFO("Either high or low (or both) float compression lists are defined. Float compression will be applied.");
201  ATH_MSG_INFO(std::format("High compression will use {} mantissa bits, and "
202  "low compression will use {} mantissa bits.",
203  m_compressionBitsHigh.toString(),
204  m_compressionBitsLow.toString()));
205  }
206 
207  // Setup stream name
208  if (m_streamName.empty()) {
209  m_streamName.setValue(this->name());
210  }
211 
212  // Set SG key for selected variable information.
213  m_selVetoesKey = std::format("SelectionVetoes_{}", m_streamName.toString());
215 
216  m_compInfoKey = std::format("CompressionInfo_{}", m_streamName.toString());
218 
219  ATH_MSG_DEBUG("End initialize");
220  return StatusCode::SUCCESS;
221 }
222 
223 // Handle incidents
224 void AthenaOutputStream::handle(const Incident& inc)
225 {
226  ATH_MSG_DEBUG(std::format("handle() incident type: {}", inc.type()));
227  // mutex shared with write() which is called from writeMetaData
228  std::unique_lock<mutex_t> lock(m_mutex);
229 
230  if( inc.type() == "MetaDataStop" ) {
231  if( m_outSeqSvc->inUse() ) {
232  if( m_outSeqSvc->inConcurrentEventsMode() ) {
233  // EventService MT - write metadata and close all remaining substreams
234  while( m_streamerMap.size() > 0 ) {
235  finalizeRange( m_streamerMap.begin()->first );
236  }
237  return;
238  }
239  if( m_outSeqSvc->lastIncident() == "EndEvent" ) {
240  // in r22 EndEvent comes before output writing
241  // - queue metadata writing and disconnect for after Event write
243  return;
244  }
245  }
246  // not in Event Service
247  writeMetaData();
248  }
249  else if( m_outSeqSvc->inUse() ) {
250  // Handle Event Ranges for Event Service
251  EventContext::ContextID_t slot = inc.context().slot();
252  if( slot == EventContext::INVALID_CONTEXT_ID ) {
253  throw GaudiException("Received Incident with invalid slot in ES mode", name(), StatusCode::FAILURE);
254  }
255  auto count_events_in_range = [&](const std::string& range) {
256  return std::count_if(m_slotRangeMap.cbegin(), m_slotRangeMap.cend(),
257  [&](auto& el){return el.second == range;} );
258  };
259  if( inc.type() == IncidentType::BeginProcessing ) {
260  // get the current/old range filename for this slot
261  const std::string rangeFN = m_slotRangeMap[ slot ];
262  // build the new range filename for this slot
263  const std::string newRangeFN = m_outSeqSvc->buildSequenceFileName( m_outputName );
264  if( !rangeFN.empty() and rangeFN != newRangeFN ) {
265  ATH_MSG_INFO(std::format("Slot range change: '{}' -> '{}'", rangeFN, newRangeFN));
266  ATH_MSG_DEBUG(std::format("There are {} slots in use",m_slotRangeMap.size()));
267  for(const auto & range : m_slotRangeMap ) {
268  ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
269  }
270  if( count_events_in_range(rangeFN) == 1 ) {
271  finalizeRange( rangeFN );
272  }
273  }
274  ATH_MSG_INFO(std::format("slot {} processing event in range: {}", slot, newRangeFN));
275  m_slotRangeMap[ slot ] = newRangeFN;
276  // remember the RangeID for this slot so we can write metadata *after* a range change
277  m_rangeIDforRangeFN[ newRangeFN ] = m_outSeqSvc->currentRangeID();
278  }
279  else if( inc.type() == IncidentType::EndProcessing ) {
280  ATH_MSG_DEBUG(std::format("There are {} slots in use", m_slotRangeMap.size()));
281  for( const auto& range : m_slotRangeMap ) {
282  ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
283  }
284  if( m_slotRangeMap.size() > 1 ) {
285  // if there are multiple slots, we can detect if the range ended with this event
286  // - except the last range, because there is no next range to clear the slot map
287  const std::string rangeFN = m_slotRangeMap[ slot ];
288  if( count_events_in_range(rangeFN) == 1 ) {
289  finalizeRange( rangeFN );
290  m_slotRangeMap[ slot ].clear();
291  }
292  }
293  }
294  }
295  ATH_MSG_DEBUG(std::format("Leaving incident handler for {}", inc.type()));
296 }
297 
298 // Note - this method works in any slot - MetaCont uses the filenames to find objects
299 void AthenaOutputStream::finalizeRange( const std::string & rangeFN )
300 {
301  ATH_MSG_DEBUG(std::format("Writing MetaData to {}", rangeFN));
302  // MN: not calling StopMetaData Incident here but directly writeMetaData() - OK for Sim, check others
303  // metadata tools like CutFlowSvc are not able to handle this yet
304  const std::string rememberID = m_outSeqSvc->setRangeID( m_rangeIDforRangeFN[ rangeFN ] );
305  writeMetaData( rangeFN );
306  m_outSeqSvc->setRangeID( rememberID );
307 
308  ATH_MSG_INFO(std::format("Finished writing Event Sequence to {}", rangeFN));
309  auto strm_iter = m_streamerMap.find( rangeFN );
310  strm_iter->second->finalizeOutput().ignore();
311  strm_iter->second->finalize().ignore();
312  m_streamerMap.erase( strm_iter );
313  m_outSeqSvc->publishRangeReport( rangeFN );
314 }
315 
316 // Method to write MetaData for this stream
317 // in ES mode the range substream is determined by the current Event slot
318 // called from the incident handler - returns void and throws GaudiExceptions on errors
320 {
321  // use main stream tool by default, or per outputFile in ES mode
322  IAthenaOutputStreamTool* streamer = outputFN.empty()? &*m_streamer : m_streamerMap[outputFN].get();
323 
324  for (auto& tool : m_helperTools) {
325  if (!tool->preFinalize().isSuccess()) {
326  throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE);
327  }
328  }
329  if( m_metaDataSvc->prepareOutput(outputFN).isFailure() ) {
330  throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
331  }
332  // lock all metadata to prevent updates during writing
334 
335  // Prepare the WriteDataHeaderForms incident
336  std::string DHFWriteIncidentfileName = m_outSeqSvc->buildSequenceFileName(m_outputName);
337  // remove technology from the name
338  size_t pos = DHFWriteIncidentfileName.find(':');
339  if( pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(pos+1);
340  FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName);
341  m_incidentSvc->fireIncident(incident);
342 
343  ATH_MSG_DEBUG("metadataItemList: " << m_metadataItemList.value() );
344  if (!m_metadataItemList.value().empty()) {
346  StatusCode status = streamer->connectServices(m_metadataStore.typeAndName(), m_persName, false);
347  if (status.isFailure()) {
348  throw GaudiException("Unable to connect metadata services", name(), StatusCode::FAILURE);
349  }
350  m_outputAttributes = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
351  m_p2BWritten->clear();
352  IProperty *pAsIProp(nullptr);
353  if ((m_p2BWritten.retrieve()).isFailure() ||
354  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
355  (pAsIProp->setProperty("ItemList", m_metadataItemList.toString())).isFailure()) {
356  throw GaudiException("Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
357  }
358  if (write().isFailure()) {
359  throw GaudiException("Cannot write metadata", name(), StatusCode::FAILURE);
360  }
361  FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName + m_outputAttributes);
362  m_incidentSvc->fireIncident(incident);
363 
364  m_outputAttributes.clear();
367  if (status.isFailure()) {
368  throw GaudiException("Unable to re-connect services", name(), StatusCode::FAILURE);
369  }
370  m_p2BWritten->clear();
371  if ((pAsIProp->setProperty(m_itemList)).isFailure()) {
372  throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
373  }
374  ATH_MSG_DEBUG(std::format("Metadata items written: {}", m_metadataItemList.value().size()));
375  }
376 }
377 
378 // Terminate data writer
380 {
381  bool failed = false;
382  ATH_MSG_DEBUG("finalize: Optimize output");
383  // Connect the output file to the service
384  if (!m_streamer->finalizeOutput().isSuccess()) {
385  failed = true;
386  }
387  ATH_MSG_DEBUG("finalize: end optimize output");
388  // Release the tools
389  if (!m_helperTools.release().isSuccess()) {
390  failed = true;
391  }
392  if (!m_streamer.release().isSuccess()) {
393  failed = true;
394  }
395  if (failed) {
396  return(StatusCode::FAILURE);
397  }
398  // Clear the internal caches
399  m_objects.clear();
400  m_objects.shrink_to_fit();
401  m_ownedObjects.clear();
402  m_altObjects.clear();
403  return(StatusCode::SUCCESS);
404 }
405 
406 // Execute data writer
408  bool failed = false;
409  // Call tool preExecute prior to writing
410  for (auto& tool : m_helperTools) {
411  if (!tool->preExecute().isSuccess()) {
412  failed = true;
413  }
414  }
415  // Write the event if the event is accepted
416  if (isEventAccepted()) {
417  if (write().isFailure()) {
418  failed = true;
419  }
420  }
421  // Call tool postExecute after writing
422  for (auto& tool : m_helperTools) {
423  if(!tool->postExecute().isSuccess()) {
424  failed = true;
425  }
426  }
427  // See if we should write metadata and do if so
429  writeMetaData();
431  // finalize will disconnect output
432  if( !finalize().isSuccess() ) {
433  failed = true;
434  }
435  }
436  if (failed) {
437  return(StatusCode::FAILURE);
438  }
439  return(StatusCode::SUCCESS);
440 }
441 
442 // The main method that performs the writing
444  bool failed = false;
445  IAthenaOutputStreamTool* streamer = &*m_streamer;
446  std::string outputFN;
447 
448  std::unique_lock<mutex_t> lock(m_mutex);
449  outputFN = m_outSeqSvc->buildSequenceFileName( m_outputName );
450 
451  // Handle Event Ranges
452  if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() ) {
453  ATH_MSG_DEBUG(std::format("Writing event sequence to {}", outputFN));
454  streamer = m_streamerMap[ outputFN ].get();
455  if( !streamer ) {
456  // new range, needs a new streamer tool
457  IAlgTool* st = AlgTool::Factory::create( m_streamer->type(), m_streamer->type(), m_streamer->name(), this ).release();
458  st->addRef();
459  streamer = dynamic_cast<IAthenaOutputStreamTool*>( st );
460  IProperty *mstreamer_props = dynamic_cast<IProperty*> (&*m_streamer);
461  IProperty *streamer_props = dynamic_cast<IProperty*> (&*streamer);
462  for ( const auto& prop : mstreamer_props->getProperties() ) {
463  ATH_CHECK( streamer_props->setProperty( *prop ) );
464  }
465  if( !streamer or streamer->initialize().isFailure()
466  or streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord).isFailure() ) {
467  ATH_MSG_FATAL(std::format("Unable to initialize OutputStreamTool for {}", outputFN));
468  return StatusCode::FAILURE;
469  }
470  m_streamerMap[ outputFN ].reset( streamer );
471  }
472  }
473 
474  // Clear any previously existing item list
475  // and collect all objects that are asked to be written out
476  clearSelection();
478 
479  // keep a local copy of the object lists so they are not overwritten when we release the lock
480  IDataSelector objects = std::move( m_objects );
481  IDataSelector altObjects = std::move( m_altObjects );
482  std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move( m_ownedObjects );
483 
484  // prepare before releasing lock because m_outputAttributes change in metadataStop
485  const std::string connectStr = outputFN + m_outputAttributes;
486 
487  for (auto& tool : m_helperTools) {
488  ATH_CHECK( tool->preStream() );
489  }
490 
491  // MN: would be nice to release the Stream lock here
492  // lock.unlock();
493 
494  // Connect the output file to the service
495  if (!streamer->connectOutput(connectStr).isSuccess()) {
496  ATH_MSG_FATAL("Could not connectOutput");
497  return StatusCode::FAILURE;
498  }
499  ATH_MSG_DEBUG(std::format("connectOutput done for {}", outputFN));
500  StatusCode currentStatus = streamer->streamObjects(objects, connectStr);
501  // Do final check of streaming
502  if (!currentStatus.isSuccess()) {
503  if (!currentStatus.isRecoverable()) {
504  ATH_MSG_FATAL("streamObjects failed.");
505  failed = true;
506  } else {
507  ATH_MSG_DEBUG("streamObjects failed.");
508  }
509  }
510  bool doCommit = false;
511  if (!streamer->commitOutput(doCommit).isSuccess()) {
512  ATH_MSG_FATAL("commitOutput failed.");
513  failed = true;
514  }
515  if (failed) {
516  return(StatusCode::FAILURE);
517  }
518  m_events++;
519  return(StatusCode::SUCCESS);
520 }
521 
522 // Clear collected object list
524  m_objects.clear();
525  m_ownedObjects.clear();
526  m_altObjects.clear();
527 }
528 
529 // Collect objects
531  if (m_itemListFromTool) {
532  if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
533  ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
534  }
535  }
536 
537  // This holds the vetoes for the AuxID selection
538  auto vetoes = std::make_unique<SG::SelectionVetoes>();
539  // This holds the lossy float compression information
540  auto compInfo = std::make_unique<SG::CompressionInfo>();
541 
542  m_p2BWritten->updateItemList(true);
543  // Collect all objects that need to be persistified:
544  for (const auto& i : *m_p2BWritten) {
545  ATH_CHECK( addItemObjects(i, *vetoes, *compInfo) );
546  }
547 
548  // If there were any variable selections, record the information in SG.
549  if (!vetoes->empty()) {
550  ATH_CHECK( SG::makeHandle (m_selVetoesKey).record (std::move (vetoes)) );
551  }
552 
553  // Store the lossy float compression information in the SG.
554  if (!compInfo->empty()) {
555  ATH_CHECK( SG::makeHandle (m_compInfoKey).record (std::move (compInfo)) );
556  }
557 
558  return StatusCode::SUCCESS;
559 }
560 
561 // Build a list of objects we're going to write out
562 // This function also builds the list of vetoed AuxIDs
563 // and the lossy float compression lists.
565  SG::SelectionVetoes& vetoes,
566  SG::CompressionInfo& compInfo)
567 {
568  // anything after a dot is a list of dynamic Aux attributes, separated by dots
569  size_t dotpos = item.key().find('.');
570  std::string item_key, aux_attr;
571  if( dotpos != std::string::npos ) {
572  item_key = item.key().substr(0, dotpos+1);
573  aux_attr = item.key().substr(dotpos+1);
574  } else {
575  item_key = item.key();
576  }
577  CLID item_id = item.id();
578  ATH_MSG_DEBUG(std::format("addItemObjects({},\"{}\") called", item_id, item_key));
579  ATH_MSG_DEBUG(std::format(" Key:{}", item_key));
580  if( aux_attr.size() ) {
581  ATH_MSG_DEBUG(std::format(" Aux Attr:{}", aux_attr));
582  }
583 
584  // Here we build the list of attributes for the lossy float compression
585  // Note that we do not allow m_compressionBitsHigh >= m_compressionBitsLow
586  // Otherwise is, in any case, a logical error and they'd potentially overwrite each other
587  std::map< unsigned int, std::set< std::string > > comp_attr_map;
588  comp_attr_map[ m_compressionBitsHigh ] = buildCompressionSet( m_compressionDecoderHigh, item_id, item_key );
589  comp_attr_map[ m_compressionBitsLow ] = buildCompressionSet( m_compressionDecoderLow, item_id, item_key );
590 
591  // Print some debugging information regarding the lossy float compression configuration
592  for( const auto& it : comp_attr_map ) {
593  ATH_MSG_DEBUG(std::format(" Comp Attr {} with {} mantissa bits.", it.second.size(), it.first));
594  if ( it.second.size() > 0 ) {
595  for( const auto& attr : it.second ) {
596  ATH_MSG_DEBUG(std::format(" >> {}", attr));
597  }
598  }
599  }
600 
601  // For MetaData objects of type T that are kept in MetaContainers get the MetaCont<T> ID
602  const CLID remapped_item_id = m_metaDataSvc->remapMetaContCLID( item_id );
604  SG::ProxyMap map;
605  bool gotProxies = false;
606  // Look for the clid in storegate
607  SG::DataProxy* match = (*m_currentStore)->proxy(remapped_item_id, item_key, true);
608  if (match != nullptr) {
609  map.insert({item_key, match});
610  iter = map.begin();
611  end = map.end();
612  gotProxies = true;
613  }
614  // Look for the clid in storegate
615  if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter, end)).isSuccess()) {
616  gotProxies = true;
617  }
618  if (gotProxies) {
619  bool added = false, removed = false;
620  // Now loop over any found proxies
621  for (; iter != end; ++iter) {
622  SG::DataProxy* itemProxy(iter->second);
623  std::string proxyName = itemProxy->name();
624  std::string stream;
625  if( m_currentStore == &m_metadataStore ) {
626  // only check metadata keys
627  stream = m_metaDataSvc->removeStreamFromKey(proxyName); // can modify proxyName
628  }
629  // Does this key match the proxy key name - allow for wildcarding and aliases
630  bool keyMatch = ( item_key == "*" ||
631  item_key == proxyName ||
632  itemProxy->hasAlias(item_key) );
633  if (!keyMatch) {
634  // For item list we currently allow wildcards ('*'), which has limited use, e.g.:
635  // xAOD::CutBookkeeperAuxContainer#IncompleteCutBookkeepers*Aux.
636  // Here we look for those few cases...
637  keyMatch = simpleMatch(item_key, proxyName);
638  ATH_MSG_DEBUG(std::format("Result of checking {} against {} to see if it matches is {}",
639  proxyName, item_key, keyMatch));
640  }
641 
642  // Now check if this item is marked for another output stream, if so we reject it
643  // We also reject keys that are marked transient at this point
644  bool xkeyMatch = false;
645  if( (!stream.empty() and stream != m_outputName) || SG::isTransientKey(proxyName) ) {
646  // reject keys that are marked for a different output stream
647  ATH_MSG_DEBUG(std::format("Rejecting key: {} in output: {}", itemProxy->name(), m_outputName.toString()));
648  xkeyMatch = true;
649  }
650 
651  // All right, it passes key match find in itemList, but not in excludeList
652  if (keyMatch && !xkeyMatch) {
653  if (m_forceRead && itemProxy->isValid()) {
654  if (nullptr == itemProxy->accessData()) {
655  ATH_MSG_ERROR(std::format(" Could not get data object for id {},\"{}\"", remapped_item_id, proxyName));
656  }
657  }
658  if (nullptr != itemProxy->object()) {
659  if( std::find(m_objects.begin(), m_objects.end(), itemProxy->object()) == m_objects.end() &&
660  std::find(m_altObjects.begin(), m_altObjects.end(), itemProxy->object()) == m_altObjects.end() )
661  {
662  if( item_id != remapped_item_id ) {
663  // For MetaCont<T>: -
664  // create a temporary DataObject for an entry in the container to pass to CnvSvc
665  DataBucketBase* dbb = static_cast<DataBucketBase*>( itemProxy->object() );
666  const MetaContBase* metaCont = static_cast<MetaContBase*>( dbb->cast( ClassID_traits<MetaContBase>::ID() ) );
667  void* obj = metaCont? metaCont->getAsVoid( m_outSeqSvc->currentRangeID() ) : nullptr;
668  if( obj ) {
669  auto altbucket = std::make_unique<AltDataBucket>(
670  obj, item_id, *CLIDRegistry::CLIDToTypeinfo(item_id), proxyName );
671  m_objects.push_back( altbucket.get() );
672  m_ownedObjects.push_back( std::move(altbucket) );
673  m_altObjects.push_back( itemProxy->object() ); // only for duplicate prevention
674  } else {
675  ATH_MSG_ERROR(std::format("Failed to retrieve object from MetaCont with key={}, for EventRangeID={}",
676  item_key, m_outSeqSvc->currentRangeID()));
677  return StatusCode::FAILURE;
678  }
679  } else if (item.exact()) {
680  // If the exact flag is set, make a new DataObject
681  // holding the object as the requested type.
682  DataBucketBase* dbb = dynamic_cast<DataBucketBase*> (itemProxy->object());
683  if (!dbb) std::abort();
684  void* ptr = dbb->cast (item_id);
685  if (!ptr) {
686  // Hard cast
687  ptr = dbb->object();
688  }
689  auto altbucket =
690  std::make_unique<AltDataBucket>
691  (ptr, item_id,
692  *CLIDRegistry::CLIDToTypeinfo (item_id),
693  *itemProxy);
694  m_objects.push_back(altbucket.get());
695  m_ownedObjects.push_back (std::move(altbucket));
696  m_altObjects.push_back (itemProxy->object());
697  }
698  else
699  m_objects.push_back(itemProxy->object());
700  ATH_MSG_DEBUG(std::format(" Added object {},\"{}\"", item_id, proxyName));
701  }
702 
703  // Build ItemListSvc string
704  std::string tn;
705  std::stringstream tns;
706  if (!m_pCLIDSvc->getTypeNameOfID(item_id, tn).isSuccess()) {
707  ATH_MSG_ERROR(std::format(" Could not get type name for id {},\"{}\"", item_id, proxyName));
708  tns << item_id << '_' << proxyName;
709  } else {
710  tn += '_' + proxyName;
711  tns << tn;
712  }
713 
717  if ((*m_currentStore)->storeID() == StoreID::EVENT_STORE &&
718  item_key.find( RootAuxDynIO::AUX_POSTFIX ) == ( item_key.size() - 4 )) {
719 
720  const SG::IConstAuxStore* auxstore( nullptr );
721  try {
722  SG::fromStorable( itemProxy->object(), auxstore, true );
723  } catch( const std::exception& ) {
724  ATH_MSG_DEBUG(std::format("Error in casting object with CLID {} to SG::IConstAuxStore*", itemProxy->clID()));
725  auxstore = nullptr;
726  }
727 
728  if (auxstore) {
729  handleVariableSelection (*auxstore, *itemProxy,
730  tns.str(), aux_attr,
731  vetoes);
732 
733  // Here comes the compression logic using ThinningInfo
734  // Get a hold of all AuxIDs for this store (static, dynamic etc.)
735  const SG::auxid_set_t allVars = auxstore->getAuxIDs();
736 
737  // Get a handle on the compression information for this store
738  std::string key = item_key;
739  key.erase (key.size()-4, 4);
740 
741  // Build the compression list, retrieve the relevant AuxIDs and
742  // store it in the relevant map that is going to be inserted into
743  // the ThinningCache later on by the ThinningCacheTool
745  compression.setCompressedAuxIDs( comp_attr_map );
746  for( const auto& it : compression.getCompressedAuxIDs( allVars ) ) {
747  if( it.second.size() > 0 ) { // insert only if the set is non-empty
748  compInfo[ key ][ it.first ] = it.second;
749  ATH_MSG_DEBUG(std::format("Container {} has {} variables that'll be "
750  "lossy float compressed with {} mantissa bits",
751  key, it.second.size(), it.first));
752  }
753  } // End of loop over variables to be lossy float compressed
754  } // End of lossy float compression logic
755 
756  }
757 
758  added = true;
759  if (m_itemSvc->addStreamItem(this->name(),tns.str()).isFailure()) {
760  ATH_MSG_WARNING(std::format("Unable to record item {} in Svc", tns.str()));
761  }
762  }
763  } else if (keyMatch && xkeyMatch) {
764  removed = true;
765  }
766  } // proxy loop
767  if (!added && !removed) {
768  ATH_MSG_DEBUG(std::format(" No object matching {},\"{}\" found", item_id, item_key));
769  } else if (removed) {
770  ATH_MSG_DEBUG(std::format(" Object being excluded based on property setting {},\"{}\". Skipping",
771  item_id, item_key));
772  }
773  } else {
774  ATH_MSG_DEBUG(std::format(" Failed to receive proxy iterators from StoreGate for {},\"{}\". Skipping",
775  item_id, item_key));
776  }
777  return StatusCode::SUCCESS;
778 }
779 
785 std::set<std::string>
786 AthenaOutputStream::buildCompressionSet (const ToolHandle<SG::IFolder>& handle,
787  const CLID& item_id,
788  const std::string& item_key) const
789 {
790  // Create an empty result
791  std::set<std::string> result;
792 
793  // Check the item is indeed Aux.
794  if(item_key.find("Aux.") == std::string::npos) {
795  return result;
796  }
797 
798  // First the high compression list
799  for (const auto& iter : *handle) {
800  // First match the IDs for early rejection.
801  if (iter.id() != item_id) {
802  continue;
803  }
804  // Then find the compression item key and the compression list string
805  size_t seppos = iter.key().find('.');
806  std::string comp_item_key{""}, comp_str{""};
807  if(seppos != std::string::npos) {
808  comp_item_key = iter.key().substr(0, seppos+1);
809  comp_str = iter.key().substr(seppos+1);
810  } else {
811  comp_item_key = iter.key();
812  }
813  // Proceed only if the keys match and the
814  // compression list string is not empty
815  if (!comp_str.empty() && comp_item_key == item_key) {
816  std::stringstream ss(comp_str);
817  std::string attr;
818  while( std::getline(ss, attr, '.') ) {
819  result.insert(attr);
820  }
821  }
822  }
823 
824  // All done, return the result
825  return result;
826 }
827 
830  SG::DataProxy& itemProxy,
831  const std::string& tns,
832  const std::string& aux_attr,
833  SG::SelectionVetoes& vetoes) const
834 {
835  // Collect dynamic Aux selection (parse the line, attributes separated by dot)
836  std::set<std::string> attributes;
837  if( aux_attr.size() ) {
838  std::stringstream ss(aux_attr);
839  std::string attr;
840  while( std::getline(ss, attr, '.') ) {
841  attributes.insert(attr);
842  std::stringstream temp;
843  temp << tns << attr;
844  if (m_itemSvc->addStreamItem(this->name(),temp.str()).isFailure()) {
845  ATH_MSG_WARNING(std::format("Unable to record item {} in Svc", temp.str()));
846  }
847  }
848  }
849 
850  // Return early if there's no selection.
851  if (attributes.empty()) {
852  return;
853  }
854 
855  std::string key = itemProxy.name();
856  if (key.size() >= 4 && key.compare (key.size()-4, 4, "Aux.")==0)
857  {
858  key.erase (key.size()-4, 4);
859  }
860 
861  // Find the entry for the selection.
862  SG::auxid_set_t& vset = vetoes[key];
863 
864  // Form the veto mask for this object.
866  sel.selectAux (attributes);
867 
868  // Get all the AuxIDs that we know of and the selected ones
869  SG::auxid_set_t all = auxstore.getAuxIDs();
870  SG::auxid_set_t selected = sel.getSelectedAuxIDs( all );
871 
872  // Loop over all and build a list of vetoed AuxIDs from non selected ones
873  for( const SG::auxid_t auxid : all ) {
874  if ( !selected.test( auxid ) ) {
875  vset.insert( auxid );
876  }
877  }
878 }
879 
880 // Here comes the list handlers...
881 void AthenaOutputStream::itemListHandler(Gaudi::Details::PropertyBase& /* theProp */) {
882  // Assuming concrete SG::Folder also has an itemList property
883  IProperty *pAsIProp(nullptr);
884  if ((m_p2BWritten.retrieve()).isFailure() ||
885  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
886  (pAsIProp->setProperty(m_itemList)).isFailure()) {
887  throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
888  }
889 }
890 
891 void AthenaOutputStream::compressionListHandlerHigh(Gaudi::Details::PropertyBase& /* theProp */) {
892  IProperty *pAsIProp(nullptr);
893  if ((m_compressionDecoderHigh.retrieve()).isFailure() ||
894  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderHigh)) ||
895  (pAsIProp->setProperty("ItemList", m_compressionListHigh.toString())).isFailure()) {
896  throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
897  }
898 }
899 
900 void AthenaOutputStream::compressionListHandlerLow(Gaudi::Details::PropertyBase& /* theProp */) {
901  IProperty *pAsIProp(nullptr);
902  if ((m_compressionDecoderLow.retrieve()).isFailure() ||
903  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderLow)) ||
904  (pAsIProp->setProperty("ItemList", m_compressionListLow.toString())).isFailure()) {
905  throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
906  }
907 }
908 
909 // Reinitialize the internal state of the algorithm
910 // for I/O purposes (e.g. upon @c fork(2))
912  ATH_MSG_INFO("I/O reinitialization...");
913  m_incidentSvc->removeListener(this, "MetaDataStop"); // Remove any existing listener to avoid handling the incident multiple times
914  m_incidentSvc->addListener(this, "MetaDataStop", 50);
915  for (auto& tool : m_helperTools) {
916  if (!tool->postInitialize().isSuccess()) {
917  ATH_MSG_ERROR("Cannot initialize helper tool");
918  }
919  }
920  return StatusCode::SUCCESS;
921 }
922 
923 // Finalize the internal state of the algorithm
924 // for I/O purposes (e.g. upon @c fork(2))
926  ATH_MSG_INFO("I/O finalization...");
927  for (auto& tool : m_helperTools) {
928  if (!tool->preFinalize().isSuccess()) {
929  ATH_MSG_ERROR("Cannot finalize helper tool");
930  }
931  }
932  const Incident metaDataStopIncident(name(), "MetaDataStop");
933  this->handle(metaDataStopIncident);
934  m_incidentSvc->removeListener(this, "MetaDataStop");
935  if (m_dataStore->clearStore().isFailure()) {
936  ATH_MSG_WARNING("Cannot clear the DataStore");
937  }
938  return StatusCode::SUCCESS;
939 }
940 
947 {
948  m_dictLoader->load_type (clid);
949 
950  // Also load the persistent class dictionary, if applicable.
951  std::unique_ptr<ITPCnvBase> tpcnv = m_tpCnvSvc->t2p_cnv_unique (clid);
952  if (tpcnv) {
953  m_dictLoader->load_type (tpcnv->persistentTInfo());
954  }
955 }
956 
958 bool AthenaOutputStream::simpleMatch(const std::string& pattern,
959  const std::string& text) {
960  size_t pi = 0, ti = 0, star = std::string::npos, match = 0;
961  while (ti < text.size()) {
962  if (pi < pattern.size() && (pattern[pi] == text[ti] || pattern[pi] == '*')) {
963  if (pattern[pi] == '*') { star = pi++; match = ti; }
964  else { ++pi; ++ti; }
965  } else if (star != std::string::npos) {
966  pi = star + 1;
967  ti = ++match;
968  } else return false;
969  }
970  while (pi < pattern.size() && pattern[pi] == '*') ++pi;
971  return pi == pattern.size();
972 }
mergePhysValFiles.pattern
pattern
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:25
AthenaOutputStream::finalize
virtual StatusCode finalize() override
Definition: AthenaOutputStream.cxx:379
AthenaOutputStream::m_rangeIDforRangeFN
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated
Definition: AthenaOutputStream.h:201
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
AthenaOutputStream::writeMetaData
void writeMetaData(const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)
Definition: AthenaOutputStream.cxx:319
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
AthenaOutputStream::m_outputAttributes
std::string m_outputAttributes
Output attributes.
Definition: AthenaOutputStream.h:233
SGIFolder.h
get_generator_info.result
result
Definition: get_generator_info.py:21
RootAuxDynIO::AUX_POSTFIX
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
Definition: RootAuxDynDefs.h:12
PowhegControl_ttHplus_NLO.ss
ss
Definition: PowhegControl_ttHplus_NLO.py:83
LArConditions2Ntuple.objects
objects
Definition: LArConditions2Ntuple.py:64
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
AthenaOutputStream::addItemObjects
StatusCode addItemObjects(const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
Definition: AthenaOutputStream.cxx:564
DataBucketBase
A non-templated base class for DataBucket, allows to access the transient object address as a void*.
Definition: DataBucketBase.h:24
SG::isTransientKey
bool isTransientKey(const std::string &key)
Test to see if a key is transoent.
Definition: transientKey.h:41
SG::fromStorable
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
Definition: StorableConversions.h:169
AthenaOutputStream::m_itemList
StringArrayProperty m_itemList
Vector of item names.
Definition: AthenaOutputStream.h:117
AthenaOutputStream::buildCompressionSet
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
Definition: AthenaOutputStream.cxx:786
AthenaOutputStream::m_dataStore
ServiceHandle< StoreGateSvc > m_dataStore
Handle to the StoreGateSvc store where the data we want to write out resides.
Definition: AthenaOutputStream.h:100
AthenaOutputStream::m_compressionListLow
StringArrayProperty m_compressionListLow
Vector of item names.
Definition: AthenaOutputStream.h:130
DataBucketBase::object
virtual void * object()=0
SG::SelectionVetoes
std::unordered_map< std::string, SG::auxid_set_t > SelectionVetoes
Map of vetoed variables.
Definition: SelectionVetoes.h:50
IAthenaOutputStreamTool
This is a tool that allows streaming out of DataObjects. This has been factorized out from AthenaOutp...
Definition: IAthenaOutputStreamTool.h:69
skel.it
it
Definition: skel.GENtoEVGEN.py:407
pool::WRITE
@ WRITE
Definition: Database/APR/StorageSvc/StorageSvc/pool.h:72
SG::DataProxy::accessData
DataObject * accessData()
Access DataObject on-demand using conversion service.
CLIDRegistry::CLIDToTypeinfo
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
Definition: CLIDRegistry.cxx:136
AthenaOutputStream::initialize
virtual StatusCode initialize() override
Definition: AthenaOutputStream.cxx:73
AthenaOutputStream::m_forceRead
BooleanProperty m_forceRead
set to true to force read of data objects in item list
Definition: AthenaOutputStream.h:149
SG::DataProxy::isValid
bool isValid() const
called by destructor
python.RatesEmulationExample.lock
lock
Definition: RatesEmulationExample.py:148
AthenaOutputStream::loadDict
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
Definition: AthenaOutputStream.cxx:946
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
AthenaOutputStream.h
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
FilteredAlgorithm::initialize
virtual StatusCode initialize()
Definition: FilteredAlgorithm.cxx:46
ITPCnvBase.h
DataBucketBase::cast
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
AthenaOutputStream::m_transientItems
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
Definition: AthenaOutputStream.h:140
AthenaOutputStream::m_objects
IDataSelector m_objects
Collection of objects being selected.
Definition: AthenaOutputStream.h:174
AthenaOutputStream::m_metadataItemList
StringArrayProperty m_metadataItemList
Vector of item names.
Definition: AthenaOutputStream.h:120
xAOD::AuxCompression
Definition: AuxCompression.h:20
AthenaOutputStream::m_currentStore
ServiceHandle< StoreGateSvc > * m_currentStore
Definition: AthenaOutputStream.h:102
AthenaPoolTestWrite.stream
string stream
Definition: AthenaPoolTestWrite.py:12
transientKey.h
ProxyMap.h
pi
#define pi
Definition: TileMuonFitter.cxx:65
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:92
SG::ProxyMap
std::map< std::string, DataProxy * > ProxyMap
Definition: ProxyMap.h:24
AthenaOutputStream::collectAllObjects
StatusCode collectAllObjects()
Collect data objects for output streamer list.
Definition: AthenaOutputStream.cxx:530
SG::makeHandle
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
Definition: ReadCondHandle.h:274
MetaContBase
Definition: MetaCont.h:24
AthenaOutputStream::m_compressionListHigh
StringArrayProperty m_compressionListHigh
Vector of item names.
Definition: AthenaOutputStream.h:127
AthenaOutputStream::m_streamer
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
Definition: AthenaOutputStream.h:185
AthenaOutputStream::m_outSeqSvc
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
Definition: AthenaOutputStream.h:111
WriteHandle.h
Handle class for recording to StoreGate.
SG::auxid_t
size_t auxid_t
Identifier for a particular aux data item.
Definition: AuxTypes.h:27
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
MetaDataSvc::ToolLockGuard
Definition: MetaDataSvc.h:243
AthenaOutputStream::AthenaOutputStream
AthenaOutputStream(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
Definition: AthenaOutputStream.cxx:43
AthenaOutputStream::m_compressionBitsHigh
UnsignedIntegerProperty m_compressionBitsHigh
Number of mantissa bits in the float compression.
Definition: AthenaOutputStream.h:133
RootAuxDynDefs.h
lumiFormat.i
int i
Definition: lumiFormat.py:85
AthenaOutputStream::m_itemSvc
ServiceHandle< IItemListSvc > m_itemSvc
Handles to all the necessary services.
Definition: AthenaOutputStream.h:105
CxxUtils::ConcurrentBitset::insert
ConcurrentBitset & insert(bit_t bit, bit_t new_nbits=0)
Set a bit to 1.
AthenaOutputStream::m_slotRangeMap
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
Definition: AthenaOutputStream.h:198
IAuxStoreIO.h
Interface providing I/O for a generic auxiliary store.
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
sel
sel
Definition: SUSYToolsTester.cxx:92
AthenaOutputStream::m_outputName
StringProperty m_outputName
Name of the output file.
Definition: AthenaOutputStream.h:143
AthenaOutputStream::compressionListHandlerLow
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:900
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
calibdata.exception
exception
Definition: calibdata.py:495
IAthenaOutputStreamTool::connectOutput
virtual StatusCode connectOutput(const std::string &outputName="")=0
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
LHEF::Reader
Pythia8::Reader Reader
Definition: Prophecy4fMerger.cxx:11
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
plotIsoValidation.el
el
Definition: plotIsoValidation.py:197
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
IAthenaOutputStreamTool::connectServices
virtual StatusCode connectServices(const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord=false)=0
Specify which data store and conversion service to use and whether to extend provenence Only use if o...
AthenaOutputStream::m_helperTools
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
Definition: AthenaOutputStream.h:188
AthenaOutputStream::m_ownedObjects
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
Definition: AthenaOutputStream.h:182
SG::VarHandleKey::initialize
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:103
AthenaOutputStream::compressionListHandlerHigh
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:891
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
AthenaOutputStream::handleVariableSelection
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &tns, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Here we build the vetoed AuxIDs.
Definition: AthenaOutputStream.cxx:829
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
AthenaOutputStream::io_finalize
virtual StatusCode io_finalize() override
Definition: AthenaOutputStream.cxx:925
AthenaOutputStream::m_persName
StringProperty m_persName
Name of the persistency service capable to write data from the store.
Definition: AthenaOutputStream.h:146
SG::DataProxy::clID
CLID clID() const
Retrieve clid.
AthenaOutputStream::m_writeMetadataAndDisconnect
bool m_writeMetadataAndDisconnect
Definition: AthenaOutputStream.h:191
AthenaOutputStream::m_extendProvenanceRecord
BooleanProperty m_extendProvenanceRecord
Set to false to omit adding the current DataHeader into the DataHeader history This will cause the in...
Definition: AthenaOutputStream.h:153
AthenaOutputStream::m_itemListFromTool
BooleanProperty m_itemListFromTool
Set to write out everything in input DataHeader.
Definition: AthenaOutputStream.h:156
AthenaOutputStream::m_transient
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
Definition: AthenaOutputStream.h:168
AthenaOutputStream::finalizeRange
void finalizeRange(const std::string &rangeFN)
Definition: AthenaOutputStream.cxx:299
AthenaOutputStream::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: AthenaOutputStream.h:109
SG::CompressionInfo
std::unordered_map< std::string, SG::ThinningInfo::compression_map_t > CompressionInfo
Map of compressed variables and their compression levels.
Definition: CompressionInfo.h:37
AthenaOutputStream::itemListHandler
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:881
xAOD::AuxSelection
Class helping in dealing with dynamic branch selection.
Definition: AuxSelection.h:31
id
SG::auxid_t id
Definition: Control/AthContainers/Root/debug.cxx:239
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AuxCompression.h
AthenaOutputStream::m_metadataStore
ServiceHandle< StoreGateSvc > m_metadataStore
Definition: AthenaOutputStream.h:101
AtlCoolConsole.tool
tool
Definition: AtlCoolConsole.py:452
AthenaOutputStream::io_reinit
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
Definition: AthenaOutputStream.cxx:911
item
Definition: ItemListSvc.h:43
AthenaOutputStream::m_tpCnvSvc
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
Definition: AthenaOutputStream.h:108
SG::DataProxy::name
virtual const name_type & name() const override final
Retrieve data object key == string.
AthenaOutputStream::m_streamName
StringProperty m_streamName
Stream name (defaults to algorithm name)
Definition: AthenaOutputStream.h:114
AthenaOutputStream::m_compressionBitsLow
UnsignedIntegerProperty m_compressionBitsLow
Number of mantissa bits in the float compression.
Definition: AthenaOutputStream.h:136
AthenaOutputStream::m_altObjects
IDataSelector m_altObjects
Objects overridden by ‘exact’ handling.
Definition: AthenaOutputStream.h:177
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:16
IAthenaOutputStreamTool::commitOutput
virtual StatusCode commitOutput(bool doCommit=false)=0
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
AthenaOutputStream::m_dictLoader
ServiceHandle< IDictLoaderSvc > m_dictLoader
Definition: AthenaOutputStream.h:107
AthenaOutputStream::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Definition: AthenaOutputStream.h:106
CLIDRegistry.h
a static registry of CLID->typeName entries. NOT for general use. Use ClassIDSvc instead.
AuxSelection.h
SG::DataProxy::hasAlias
bool hasAlias(const std::string &key) const
Test to see if a given string is in the alias set.
AthenaOutputStream::m_events
std::atomic< int > m_events
Number of events written to this output stream.
Definition: AthenaOutputStream.h:194
AthenaOutputStream::simpleMatch
bool simpleMatch(const std::string &pattern, const std::string &text)
Glob-style matcher, where the only meta-character is '*'.
Definition: AthenaOutputStream.cxx:958
AthenaOutputStream::m_compressionDecoderHigh
ToolHandle< SG::IFolder > m_compressionDecoderHigh
The top-level folder with items to be compressed high.
Definition: AthenaOutputStream.h:162
AthenaOutputStream::m_mutex
mutex_t m_mutex
mutex for this Stream write() and handle() methods
Definition: AthenaOutputStream.h:207
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
ITPCnvBase::persistentTInfo
virtual const std::type_info & persistentTInfo() const =0
return C++ type id of the persistent class this converter is for
AltDataBucket.h
AthenaOutputStream::execute
virtual StatusCode execute() override
Definition: AthenaOutputStream.cxx:407
TransientAddress.h
AthenaOutputStream::m_streamerMap
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT
Definition: AthenaOutputStream.h:204
IAuxStore.h
Interface for non-const operations on an auxiliary store.
makeTransCanvas.text
text
Definition: makeTransCanvas.py:11
SG::auxid_set_t
A set of aux data identifiers.
Definition: AuxTypes.h:47
merge.status
status
Definition: merge.py:16
StoreID::EVENT_STORE
@ EVENT_STORE
Definition: StoreID.h:26
AthenaOutputStream::write
virtual StatusCode write()
Stream the data.
Definition: AthenaOutputStream.cxx:443
AthenaOutputStream::m_pCLIDSvc
ServiceHandle< IClassIDSvc > m_pCLIDSvc
Definition: AthenaOutputStream.h:110
AthenaOutputStream::m_selVetoesKey
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
Definition: AthenaOutputStream.h:225
IConstAuxStore.h
Interface for const operations on an auxiliary store.
SG::IConstAuxStore
Interface for const operations on an auxiliary store.
Definition: IConstAuxStore.h:64
GetLBsToIgnore.outputFN
outputFN
Definition: GetLBsToIgnore.py:224
python.PyAthena.obj
obj
Definition: PyAthena.py:132
MetaContBase::getAsVoid
virtual void * getAsVoid(const SourceID &sid) const =0
IAthenaOutputStreamTool::streamObjects
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="")=0
SG::DataProxy
Definition: DataProxy.h:45
Cut::all
@ all
Definition: SUSYToolsAlg.cxx:67
StoreGateSvc.h
SG::FolderItem
a Folder item (data object) is identified by the clid/key pair
Definition: SGFolderItem.h:24
python.TransformConfig.attributes
def attributes(self)
Definition: TransformConfig.py:384
SG::IConstAuxStore::getAuxIDs
virtual const SG::auxid_set_t & getAuxIDs() const =0
Return a set of identifiers for existing data items in this store.
SG::ConstProxyIterator
ProxyMap::const_iterator ConstProxyIterator
Definition: ProxyMap.h:28
match
bool match(std::string s1, std::string s2)
match the individual directories of two strings
Definition: hcg.cxx:356
AthenaOutputStream::handle
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.
Definition: AthenaOutputStream.cxx:224
AthenaOutputStream::m_p2BWritten
ToolHandle< SG::IFolder > m_p2BWritten
The top-level folder with items to be written.
Definition: AthenaOutputStream.h:159
fitman.k
k
Definition: fitman.py:528
AthenaOutputStream::clearSelection
void clearSelection()
Clear list of selected objects.
Definition: AthenaOutputStream.cxx:523
python.BeamSpotUpdate.compression
compression
Definition: BeamSpotUpdate.py:188
AthenaOutputStream::m_compressionDecoderLow
ToolHandle< SG::IFolder > m_compressionDecoderLow
The top-level folder with items to be compressed low.
Definition: AthenaOutputStream.h:165
ServiceHandle< IIoComponentMgr >
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
AthenaOutputStream::~AthenaOutputStream
virtual ~AthenaOutputStream()
Standard Destructor.
Definition: AthenaOutputStream.cxx:67
CxxUtils::ConcurrentBitset::test
bool test(bit_t bit) const
Test to see if a bit is set.
AthenaOutputStream::m_compInfoKey
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.
Definition: AthenaOutputStream.h:230
DataProxy.h