ATLAS Offline Software
AthenaOutputStream.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
5 #include "AthenaOutputStream.h"
6 
7 // STL include files
8 #include <cassert>
9 #include <format>
10 #include <sstream>
11 
12 // Framework include files
18 #include "GaudiKernel/AlgTool.h"
19 #include "GaudiKernel/ClassID.h"
20 #include "GaudiKernel/GaudiException.h"
21 #include "GaudiKernel/IAlgManager.h"
22 #include "GaudiKernel/IIoComponentMgr.h"
23 #include "GaudiKernel/IOpaqueAddress.h"
24 #include "GaudiKernel/IProperty.h"
25 #include "GaudiKernel/ISvcLocator.h"
26 #include "GaudiKernel/MsgStream.h"
29 #include "SGTools/DataProxy.h"
30 #include "SGTools/ProxyMap.h"
31 #include "SGTools/SGIFolder.h"
33 #include "SGTools/transientKey.h"
34 #include "StoreGate/StoreGateSvc.h"
35 #include "StoreGate/WriteHandle.h"
37 #include "xAODCore/AuxSelection.h"
38 
39 // Local include files
40 #include "AltDataBucket.h"
41 
42 // Standard Constructor
43 AthenaOutputStream::AthenaOutputStream(const std::string& name, ISvcLocator* pSvcLocator)
44  : base_class(name, pSvcLocator),
45  m_currentStore(&m_dataStore),
46  m_p2BWritten(std::format("SG::Folder/{}_TopFolder", name), this),
47  m_compressionDecoderHigh(std::format("SG::Folder/{}_compressed_high", name), this),
48  m_compressionDecoderLow(std::format("SG::Folder/{}_compressed_low", name), this),
49  m_transient(std::format("SG::Folder/{}_transient", name), this),
50  m_streamer(std::format("AthenaOutputStreamTool/{}Tool", name), this)
51 {
52  // Ensure the service locater is good
53  assert(pSvcLocator);
54 
55  // This property depends on the name that's known at construction time
56  // Therefore, do it the old fashioned way
57  declareProperty("WritingTool", m_streamer);
58 
59  // Associate action handlers with the AcceptAlgs,
60  // RequireAlgs & VetoAlgs properties
61  m_itemList.declareUpdateHandler(&AthenaOutputStream::itemListHandler, this);
64 }
65 
66 // Standard Destructor
68  // Clear the internal caches
69  m_streamerMap.clear();
70 }
71 
72 // Initialize data writer
74  ATH_MSG_DEBUG("In initialize");
75 
76  // Initialize the FilteredAlgorithm base
78 
79  // Reset the number of events written
80  m_events = 0;
81 
82  // Set up the SG services
83  ATH_CHECK( m_dataStore.retrieve() );
84  ATH_MSG_DEBUG(std::format("Found {} store.", m_dataStore.typeAndName()));
85  if (!m_metadataItemList.value().empty()) {
86  ATH_CHECK( m_metadataStore.retrieve() );
87  ATH_MSG_DEBUG(std::format("Found {} store.", m_metadataStore.typeAndName()));
88  }
89 
90  // Set up various services
91  ATH_CHECK( m_pCLIDSvc.retrieve() );
92  ATH_CHECK( m_dictLoader.retrieve() );
93  ATH_CHECK( m_tpCnvSvc.retrieve() );
94  ATH_CHECK( m_outSeqSvc.retrieve() );
95 
96  // Get Output Stream tool for writing
97  ATH_CHECK( m_streamer.retrieve() );
98  ATH_CHECK( m_streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord) );
99 
100  ATH_CHECK( m_helperTools.retrieve() );
101  ATH_MSG_INFO("Found " << m_helperTools);
102  ATH_MSG_INFO(std::format("Data output: {}", m_outputName.toString()));
103 
104  for (auto& tool : m_helperTools) {
105  ATH_CHECK( tool->postInitialize() );
106  }
107 
108  // Register this algorithm for 'I/O' events
109  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
110  ATH_CHECK( iomgr.retrieve() );
111  ATH_CHECK( iomgr->io_register(this) );
112  ATH_CHECK( iomgr->io_register(this, IIoComponentMgr::IoMode::WRITE, m_outputName) );
113  ATH_CHECK( this->io_reinit() );
114 
115  // Add an explicit input dependency for everything in our item list
116  // that we know from the configuration is in the transient store.
117  // We don't want to add everything on the list, because configurations
118  // often initialize this with a maximal static list of everything
119  // that could possibly be written.
120  {
121  ATH_CHECK( m_transient.retrieve() );
122  IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
123  if (!pAsIProp) {
124  ATH_MSG_FATAL ("Bad folder interface");
125  return StatusCode::FAILURE;
126  }
127  ATH_CHECK (pAsIProp->setProperty("ItemList", m_transientItems.toString()));
128 
129  for (const SG::FolderItem& item : *m_p2BWritten) {
130  // Load ROOT dictionaries now.
131  loadDict (item.id());
132 
133  const std::string& k = item.key();
134  if (k.find('*') != std::string::npos) continue;
135  if (k.find('.') != std::string::npos) continue;
136  for (const SG::FolderItem& titem : *m_transient) {
137  if (titem.id() == item.id() && titem.key() == k) {
138  DataObjID id (item.id(), std::format("{}+{}", m_dataStore.name(), k));
139  this->addDependency (id, Gaudi::DataHandle::Reader);
140  break;
141  }
142  }
143  }
144  m_transient->clear();
145  }
146 
147  // Also load dictionaries for metadata classes.
148  if (!m_metadataItemList.value().empty()) {
149  IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
150  if (!pAsIProp) {
151  ATH_MSG_FATAL ("Bad folder interface");
152  return StatusCode::FAILURE;
153  }
154  ATH_CHECK (pAsIProp->setProperty("ItemList", m_metadataItemList.toString()));
155  for (const SG::FolderItem& item : *m_transient) {
156  loadDict (item.id());
157  }
158  m_transient->clear();
159  }
160 
161  // Also make sure we have the dictionary for Token.
162  m_dictLoader->load_type ("Token");
163 
164  // Listen to event range incidents if incident name is configured
165  ATH_CHECK( m_incidentSvc.retrieve() );
166  if( !m_outSeqSvc->incidentName().empty() ) {
167  // use priority 95 to make sure the Output Sequencer goes first (it has priority 100)
168  m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 95);
169  m_incidentSvc->addListener(this, IncidentType::EndProcessing, 95);
170  }
171 
172  // Check compression settings and print some information about the configuration
173  // Both should be between [5, 23] and high compression should be < low compression
174  if(m_compressionBitsHigh < 5u || m_compressionBitsHigh > 23u) {
175  ATH_MSG_INFO(std::format("Float compression mantissa bits for high compression "
176  "({}) is outside the allowed range of [5, 23].",
177  m_compressionBitsHigh.toString()));
178  ATH_MSG_INFO("Setting it to the appropriate limit.");
180  }
181  if(m_compressionBitsLow < 5u || m_compressionBitsLow > 23u) {
182  ATH_MSG_INFO(std::format("Float compression mantissa bits for low compression "
183  "({}) is outside the allowed range of [5, 23].",
184  m_compressionBitsLow.toString()));
185  ATH_MSG_INFO("Setting it to the appropriate limit.");
187  }
189  ATH_MSG_ERROR(std::format("Float compression mantissa bits for low compression "
190  "({}) is lower than or equal to high compression "
191  "({})! Please check the configuration! ",
192  m_compressionBitsLow.toString(),
193  m_compressionBitsHigh.toString()));
194  return StatusCode::FAILURE;
195  }
196  if(m_compressionListHigh.value().empty() && m_compressionListLow.value().empty()) {
197  ATH_MSG_VERBOSE("Both high and low float compression lists are empty. Float compression will NOT be applied.");
198  } else {
199  ATH_MSG_INFO("Either high or low (or both) float compression lists are defined. Float compression will be applied.");
200  ATH_MSG_INFO(std::format("High compression will use {} mantissa bits, and "
201  "low compression will use {} mantissa bits.",
202  m_compressionBitsHigh.toString(),
203  m_compressionBitsLow.toString()));
204  }
205 
206  // Setup stream name
207  if (m_streamName.empty()) {
208  m_streamName.setValue(this->name());
209  }
210 
211  // Set SG key for selected variable information.
212  m_selVetoesKey = std::format("SelectionVetoes_{}", m_streamName.toString());
214 
215  m_compInfoKey = std::format("CompressionInfo_{}", m_streamName.toString());
217 
218  ATH_MSG_DEBUG("End initialize");
219  return StatusCode::SUCCESS;
220 }
221 
222 // Handle incidents
223 void AthenaOutputStream::handle(const Incident& inc)
224 {
225  ATH_MSG_DEBUG(std::format("handle() incident type: {}", inc.type()));
226  // mutex shared with write() which is called from writeMetaData
227  std::unique_lock<mutex_t> lock(m_mutex);
228 
229  if( inc.type() == "MetaDataStop" ) {
230  if( m_outSeqSvc->inUse() ) {
231  if( m_outSeqSvc->inConcurrentEventsMode() ) {
232  // EventService MT - write metadata and close all remaining substreams
233  while( m_streamerMap.size() > 0 ) {
234  finalizeRange( m_streamerMap.begin()->first );
235  }
236  return;
237  }
238  if( m_outSeqSvc->lastIncident() == "EndEvent" ) {
239  // in r22 EndEvent comes before output writing
240  // - queue metadata writing and disconnect for after Event write
242  return;
243  }
244  }
245  // not in Event Service
246  writeMetaData();
247  }
248  else if( m_outSeqSvc->inUse() ) {
249  // Handle Event Ranges for Event Service
250  EventContext::ContextID_t slot = inc.context().slot();
251  if( slot == EventContext::INVALID_CONTEXT_ID ) {
252  throw GaudiException("Received Incident with invalid slot in ES mode", name(), StatusCode::FAILURE);
253  }
254  auto count_events_in_range = [&](const std::string& range) {
255  return std::count_if(m_slotRangeMap.cbegin(), m_slotRangeMap.cend(),
256  [&](auto& el){return el.second == range;} );
257  };
258  if( inc.type() == IncidentType::BeginProcessing ) {
259  // get the current/old range filename for this slot
260  const std::string rangeFN = m_slotRangeMap[ slot ];
261  // build the new range filename for this slot
262  const std::string newRangeFN = m_outSeqSvc->buildSequenceFileName( m_outputName );
263  if( !rangeFN.empty() and rangeFN != newRangeFN ) {
264  ATH_MSG_INFO(std::format("Slot range change: '{}' -> '{}'", rangeFN, newRangeFN));
265  ATH_MSG_DEBUG(std::format("There are {} slots in use",m_slotRangeMap.size()));
266  for(const auto & range : m_slotRangeMap ) {
267  ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
268  }
269  if( count_events_in_range(rangeFN) == 1 ) {
270  finalizeRange( rangeFN );
271  }
272  }
273  ATH_MSG_INFO(std::format("slot {} processing event in range: {}", slot, newRangeFN));
274  m_slotRangeMap[ slot ] = newRangeFN;
275  // remember the RangeID for this slot so we can write metadata *after* a range change
276  m_rangeIDforRangeFN[ newRangeFN ] = m_outSeqSvc->currentRangeID();
277  }
278  else if( inc.type() == IncidentType::EndProcessing ) {
279  ATH_MSG_DEBUG(std::format("There are {} slots in use", m_slotRangeMap.size()));
280  for( const auto& range : m_slotRangeMap ) {
281  ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
282  }
283  if( m_slotRangeMap.size() > 1 ) {
284  // if there are multiple slots, we can detect if the range ended with this event
285  // - except the last range, because there is no next range to clear the slot map
286  const std::string rangeFN = m_slotRangeMap[ slot ];
287  if( count_events_in_range(rangeFN) == 1 ) {
288  finalizeRange( rangeFN );
289  m_slotRangeMap[ slot ].clear();
290  }
291  }
292  }
293  }
294  ATH_MSG_DEBUG(std::format("Leaving incident handler for {}", inc.type()));
295 }
296 
297 // Note - this method works in any slot - MetaCont uses the filenames to find objects
298 void AthenaOutputStream::finalizeRange( const std::string & rangeFN )
299 {
300  ATH_MSG_DEBUG(std::format("Writing MetaData to {}", rangeFN));
301  // MN: not calling StopMetaData Incident here but directly writeMetaData() - OK for Sim, check others
302  // metadata tools like CutFlowSvc are not able to handle this yet
303  const std::string rememberID = m_outSeqSvc->setRangeID( m_rangeIDforRangeFN[ rangeFN ] );
304  writeMetaData( rangeFN );
305  m_outSeqSvc->setRangeID( rememberID );
306 
307  ATH_MSG_INFO(std::format("Finished writing Event Sequence to {}", rangeFN));
308  auto strm_iter = m_streamerMap.find( rangeFN );
309  strm_iter->second->finalizeOutput().ignore();
310  strm_iter->second->finalize().ignore();
311  m_streamerMap.erase( strm_iter );
312  m_outSeqSvc->publishRangeReport( rangeFN );
313 }
314 
315 // Method to write MetaData for this stream
316 // in ES mode the range substream is determined by the current Event slot
317 // called from the incident handler - returns void and throws GaudiExceptions on errors
319 {
320  // use main stream tool by default, or per outputFile in ES mode
321  IAthenaOutputStreamTool* streamer = outputFN.empty()? &*m_streamer : m_streamerMap[outputFN].get();
322 
323  for (auto& tool : m_helperTools) {
324  if (!tool->preFinalize().isSuccess()) {
325  throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE);
326  }
327  }
328  if( m_metaDataSvc->prepareOutput(outputFN).isFailure() ) {
329  throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
330  }
331  // lock all metadata to prevent updates during writing
333 
334  // Prepare the WriteDataHeaderForms incident
335  std::string DHFWriteIncidentfileName = m_outSeqSvc->buildSequenceFileName(m_outputName);
336  // remove technology from the name
337  size_t pos = DHFWriteIncidentfileName.find(':');
338  if( pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(pos+1);
339  FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName);
340  m_incidentSvc->fireIncident(incident);
341 
342  ATH_MSG_DEBUG("metadataItemList: " << m_metadataItemList.value() );
343  if (!m_metadataItemList.value().empty()) {
345  StatusCode status = streamer->connectServices(m_metadataStore.typeAndName(), m_persName, false);
346  if (status.isFailure()) {
347  throw GaudiException("Unable to connect metadata services", name(), StatusCode::FAILURE);
348  }
349  m_outputAttributes = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
350  m_p2BWritten->clear();
351  IProperty *pAsIProp(nullptr);
352  if ((m_p2BWritten.retrieve()).isFailure() ||
353  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
354  (pAsIProp->setProperty("ItemList", m_metadataItemList.toString())).isFailure()) {
355  throw GaudiException("Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
356  }
357  if (write().isFailure()) {
358  throw GaudiException("Cannot write metadata", name(), StatusCode::FAILURE);
359  }
360  FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName + m_outputAttributes);
361  m_incidentSvc->fireIncident(incident);
362 
363  m_outputAttributes.clear();
366  if (status.isFailure()) {
367  throw GaudiException("Unable to re-connect services", name(), StatusCode::FAILURE);
368  }
369  m_p2BWritten->clear();
370  if ((pAsIProp->setProperty(m_itemList)).isFailure()) {
371  throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
372  }
373  ATH_MSG_DEBUG(std::format("Metadata items written: {}", m_metadataItemList.value().size()));
374  }
375 }
376 
377 // Terminate data writer
379 {
380  bool failed = false;
381  ATH_MSG_DEBUG("finalize: Optimize output");
382  // Connect the output file to the service
383  if (!m_streamer->finalizeOutput().isSuccess()) {
384  failed = true;
385  }
386  ATH_MSG_DEBUG("finalize: end optimize output");
387  // Release the tools
388  if (!m_helperTools.release().isSuccess()) {
389  failed = true;
390  }
391  if (!m_streamer.release().isSuccess()) {
392  failed = true;
393  }
394  if (failed) {
395  return(StatusCode::FAILURE);
396  }
397  // Clear the internal caches
398  m_objects.clear();
399  m_objects.shrink_to_fit();
400  m_ownedObjects.clear();
401  m_altObjects.clear();
402  return(StatusCode::SUCCESS);
403 }
404 
405 // Execute data writer
407  bool failed = false;
408  // Call tool preExecute prior to writing
409  for (auto& tool : m_helperTools) {
410  if (!tool->preExecute().isSuccess()) {
411  failed = true;
412  }
413  }
414  // Write the event if the event is accepted
415  if (isEventAccepted()) {
416  if (write().isFailure()) {
417  failed = true;
418  }
419  }
420  // Call tool postExecute after writing
421  for (auto& tool : m_helperTools) {
422  if(!tool->postExecute().isSuccess()) {
423  failed = true;
424  }
425  }
426  // See if we should write metadata and do if so
428  writeMetaData();
430  // finalize will disconnect output
431  if( !finalize().isSuccess() ) {
432  failed = true;
433  }
434  }
435  if (failed) {
436  return(StatusCode::FAILURE);
437  }
438  return(StatusCode::SUCCESS);
439 }
440 
441 // The main method that performs the writing
443  bool failed = false;
444  IAthenaOutputStreamTool* streamer = &*m_streamer;
445  std::string outputFN;
446 
447  std::unique_lock<mutex_t> lock(m_mutex);
448  outputFN = m_outSeqSvc->buildSequenceFileName( m_outputName );
449 
450  // Handle Event Ranges
451  if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() ) {
452  ATH_MSG_DEBUG(std::format("Writing event sequence to {}", outputFN));
453  streamer = m_streamerMap[ outputFN ].get();
454  if( !streamer ) {
455  // new range, needs a new streamer tool
456  IAlgTool* st = AlgTool::Factory::create( m_streamer->type(), m_streamer->type(), m_streamer->name(), this ).release();
457  st->addRef();
458  streamer = dynamic_cast<IAthenaOutputStreamTool*>( st );
459  IProperty *mstreamer_props = dynamic_cast<IProperty*> (&*m_streamer);
460  IProperty *streamer_props = dynamic_cast<IProperty*> (&*streamer);
461  if (!mstreamer_props || !streamer_props) {
462  ATH_MSG_FATAL("Cannot cast streamer to IProperty");
463  return StatusCode::FAILURE;
464  }
465  for ( const auto& prop : mstreamer_props->getProperties() ) {
466  ATH_CHECK( streamer_props->setProperty( *prop ) );
467  }
468  if( !streamer or streamer->initialize().isFailure()
469  or streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord).isFailure() ) {
470  ATH_MSG_FATAL(std::format("Unable to initialize OutputStreamTool for {}", outputFN));
471  return StatusCode::FAILURE;
472  }
473  m_streamerMap[ outputFN ].reset( streamer );
474  }
475  }
476 
477  // Clear any previously existing item list
478  // and collect all objects that are asked to be written out
479  clearSelection();
481 
482  // keep a local copy of the object lists so they are not overwritten when we release the lock
483  IDataSelector objects = std::move( m_objects );
484  IDataSelector altObjects = std::move( m_altObjects );
485  std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move( m_ownedObjects );
486 
487  // prepare before releasing lock because m_outputAttributes change in metadataStop
488  const std::string connectStr = outputFN + m_outputAttributes;
489 
490  for (auto& tool : m_helperTools) {
491  ATH_CHECK( tool->preStream() );
492  }
493 
494  // MN: would be nice to release the Stream lock here
495  // lock.unlock();
496 
497  // Connect the output file to the service
498  if (!streamer->connectOutput(connectStr).isSuccess()) {
499  ATH_MSG_FATAL("Could not connectOutput");
500  return StatusCode::FAILURE;
501  }
502  ATH_MSG_DEBUG(std::format("connectOutput done for {}", outputFN));
503  StatusCode currentStatus = streamer->streamObjects(objects, connectStr);
504  // Do final check of streaming
505  if (!currentStatus.isSuccess()) {
506  if (!currentStatus.isRecoverable()) {
507  ATH_MSG_FATAL("streamObjects failed.");
508  failed = true;
509  } else {
510  ATH_MSG_DEBUG("streamObjects failed.");
511  }
512  }
513  bool doCommit = false;
514  if (!streamer->commitOutput(doCommit).isSuccess()) {
515  ATH_MSG_FATAL("commitOutput failed.");
516  failed = true;
517  }
518  if (failed) {
519  return(StatusCode::FAILURE);
520  }
521  m_events++;
522  return(StatusCode::SUCCESS);
523 }
524 
525 // Clear collected object list
527  m_objects.clear();
528  m_ownedObjects.clear();
529  m_altObjects.clear();
530 }
531 
532 // Collect objects
534  if (m_itemListFromTool) {
535  if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
536  ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
537  }
538  }
539 
540  // This holds the vetoes for the AuxID selection
541  auto vetoes = std::make_unique<SG::SelectionVetoes>();
542  // This holds the lossy float compression information
543  auto compInfo = std::make_unique<SG::CompressionInfo>();
544 
545  m_p2BWritten->updateItemList(true);
546  // Collect all objects that need to be persistified:
547  for (const auto& i : *m_p2BWritten) {
548  ATH_CHECK( addItemObjects(i, *vetoes, *compInfo) );
549  }
550 
551  // If there were any variable selections, record the information in SG.
552  if (!vetoes->empty()) {
553  ATH_CHECK( SG::makeHandle (m_selVetoesKey).record (std::move (vetoes)) );
554  }
555 
556  // Store the lossy float compression information in the SG.
557  if (!compInfo->empty()) {
558  ATH_CHECK( SG::makeHandle (m_compInfoKey).record (std::move (compInfo)) );
559  }
560 
561  return StatusCode::SUCCESS;
562 }
563 
564 // Build a list of objects we're going to write out
565 // This function also builds the list of vetoed AuxIDs
566 // and the lossy float compression lists.
568  SG::SelectionVetoes& vetoes,
569  SG::CompressionInfo& compInfo)
570 {
571  // anything after a dot is a list of dynamic Aux attributes, separated by dots
572  size_t dotpos = item.key().find('.');
573  std::string item_key, aux_attr;
574  if( dotpos != std::string::npos ) {
575  item_key = item.key().substr(0, dotpos+1);
576  aux_attr = item.key().substr(dotpos+1);
577  } else {
578  item_key = item.key();
579  }
580  CLID item_id = item.id();
581  ATH_MSG_DEBUG(std::format("addItemObjects({},\"{}\") called", item_id, item_key));
582  ATH_MSG_DEBUG(std::format(" Key:{}", item_key));
583  if( aux_attr.size() ) {
584  ATH_MSG_DEBUG(std::format(" Aux Attr:{}", aux_attr));
585  }
586 
587  // Here we build the list of attributes for the lossy float compression
588  // Note that we do not allow m_compressionBitsHigh >= m_compressionBitsLow
589  // Otherwise is, in any case, a logical error and they'd potentially overwrite each other
590  std::map< unsigned int, std::set< std::string > > comp_attr_map;
591  comp_attr_map[ m_compressionBitsHigh ] = buildCompressionSet( m_compressionDecoderHigh, item_id, item_key );
592  comp_attr_map[ m_compressionBitsLow ] = buildCompressionSet( m_compressionDecoderLow, item_id, item_key );
593 
594  // Print some debugging information regarding the lossy float compression configuration
595  for( const auto& it : comp_attr_map ) {
596  ATH_MSG_DEBUG(std::format(" Comp Attr {} with {} mantissa bits.", it.second.size(), it.first));
597  if ( it.second.size() > 0 ) {
598  for( const auto& attr : it.second ) {
599  ATH_MSG_DEBUG(std::format(" >> {}", attr));
600  }
601  }
602  }
603 
604  // For MetaData objects of type T that are kept in MetaContainers get the MetaCont<T> ID
605  const CLID remapped_item_id = m_metaDataSvc->remapMetaContCLID( item_id );
607  SG::ProxyMap map;
608  bool gotProxies = false;
609  // Look for the clid in storegate
610  SG::DataProxy* match = (*m_currentStore)->proxy(remapped_item_id, item_key, true);
611  if (match != nullptr) {
612  map.insert({item_key, match});
613  iter = map.begin();
614  end = map.end();
615  gotProxies = true;
616  }
617  // Look for the clid in storegate
618  if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter, end)).isSuccess()) {
619  gotProxies = true;
620  }
621  if (gotProxies) {
622  bool added = false, removed = false;
623  // Now loop over any found proxies
624  for (; iter != end; ++iter) {
625  SG::DataProxy* itemProxy(iter->second);
626  std::string proxyName = itemProxy->name();
627  std::string stream;
628  if( m_currentStore == &m_metadataStore ) {
629  // only check metadata keys
630  stream = m_metaDataSvc->removeStreamFromKey(proxyName); // can modify proxyName
631  }
632  // Does this key match the proxy key name - allow for wildcarding and aliases
633  bool keyMatch = ( item_key == "*" ||
634  item_key == proxyName ||
635  itemProxy->hasAlias(item_key) );
636  if (!keyMatch) {
637  // For item list we currently allow wildcards ('*'), which has limited use, e.g.:
638  // xAOD::CutBookkeeperAuxContainer#IncompleteCutBookkeepers*Aux.
639  // Here we look for those few cases...
640  keyMatch = simpleMatch(item_key, proxyName);
641  ATH_MSG_DEBUG(std::format("Result of checking {} against {} to see if it matches is {}",
642  proxyName, item_key, keyMatch));
643  }
644 
645  // Now check if this item is marked for another output stream, if so we reject it
646  // We also reject keys that are marked transient at this point
647  bool xkeyMatch = false;
648  if( (!stream.empty() and stream != m_outputName) || SG::isTransientKey(proxyName) ) {
649  // reject keys that are marked for a different output stream
650  ATH_MSG_DEBUG(std::format("Rejecting key: {} in output: {}", itemProxy->name(), m_outputName.toString()));
651  xkeyMatch = true;
652  }
653 
654  // All right, it passes key match find in itemList, but not in excludeList
655  if (keyMatch && !xkeyMatch) {
656  if (m_forceRead && itemProxy->isValid()) {
657  if (nullptr == itemProxy->accessData()) {
658  ATH_MSG_ERROR(std::format(" Could not get data object for id {},\"{}\"", remapped_item_id, proxyName));
659  }
660  }
661  if (nullptr != itemProxy->object()) {
662  if( std::find(m_objects.begin(), m_objects.end(), itemProxy->object()) == m_objects.end() &&
663  std::find(m_altObjects.begin(), m_altObjects.end(), itemProxy->object()) == m_altObjects.end() )
664  {
665  if( item_id != remapped_item_id ) {
666  // For MetaCont<T>: -
667  // create a temporary DataObject for an entry in the container to pass to CnvSvc
668  DataBucketBase* dbb = static_cast<DataBucketBase*>( itemProxy->object() );
669  const MetaContBase* metaCont = static_cast<MetaContBase*>( dbb->cast( ClassID_traits<MetaContBase>::ID() ) );
670  void* obj = metaCont? metaCont->getAsVoid( m_outSeqSvc->currentRangeID() ) : nullptr;
671  if( obj ) {
672  auto altbucket = std::make_unique<AltDataBucket>(
673  obj, item_id, *CLIDRegistry::CLIDToTypeinfo(item_id), proxyName );
674  m_objects.push_back( altbucket.get() );
675  m_ownedObjects.push_back( std::move(altbucket) );
676  m_altObjects.push_back( itemProxy->object() ); // only for duplicate prevention
677  } else {
678  ATH_MSG_ERROR(std::format("Failed to retrieve object from MetaCont with key={}, for EventRangeID={}",
679  item_key, m_outSeqSvc->currentRangeID()));
680  return StatusCode::FAILURE;
681  }
682  } else if (item.exact()) {
683  // If the exact flag is set, make a new DataObject
684  // holding the object as the requested type.
685  DataBucketBase* dbb = dynamic_cast<DataBucketBase*> (itemProxy->object());
686  if (!dbb) std::abort();
687  void* ptr = dbb->cast (item_id);
688  if (!ptr) {
689  // Hard cast
690  ptr = dbb->object();
691  }
692  auto altbucket =
693  std::make_unique<AltDataBucket>
694  (ptr, item_id,
695  *CLIDRegistry::CLIDToTypeinfo (item_id),
696  *itemProxy);
697  m_objects.push_back(altbucket.get());
698  m_ownedObjects.push_back (std::move(altbucket));
699  m_altObjects.push_back (itemProxy->object());
700  }
701  else
702  m_objects.push_back(itemProxy->object());
703  ATH_MSG_DEBUG(std::format(" Added object {},\"{}\"", item_id, proxyName));
704  }
705 
709  if ((*m_currentStore)->storeID() == StoreID::EVENT_STORE &&
710  item_key.find( RootAuxDynIO::AUX_POSTFIX ) == ( item_key.size() - 4 )) {
711 
712  const SG::IConstAuxStore* auxstore( nullptr );
713  try {
714  SG::fromStorable( itemProxy->object(), auxstore, true );
715  } catch( const std::exception& ) {
716  ATH_MSG_DEBUG(std::format("Error in casting object with CLID {} to SG::IConstAuxStore*", itemProxy->clID()));
717  auxstore = nullptr;
718  }
719 
720  if (auxstore) {
721  handleVariableSelection (*auxstore, *itemProxy,
722  aux_attr, vetoes);
723 
724  // Here comes the compression logic using ThinningInfo
725  // Get a hold of all AuxIDs for this store (static, dynamic etc.)
726  const SG::auxid_set_t allVars = auxstore->getAuxIDs();
727 
728  // Get a handle on the compression information for this store
729  std::string key = item_key;
730  key.erase (key.size()-4, 4);
731 
732  // Build the compression list, retrieve the relevant AuxIDs and
733  // store it in the relevant map that is going to be inserted into
734  // the ThinningCache later on by the ThinningCacheTool
736  compression.setCompressedAuxIDs( comp_attr_map );
737  for( const auto& it : compression.getCompressedAuxIDs( allVars ) ) {
738  if( it.second.size() > 0 ) { // insert only if the set is non-empty
739  compInfo[ key ][ it.first ] = it.second;
740  ATH_MSG_DEBUG(std::format("Container {} has {} variables that'll be "
741  "lossy float compressed with {} mantissa bits",
742  key, it.second.size(), it.first));
743  }
744  } // End of loop over variables to be lossy float compressed
745  } // End of lossy float compression logic
746 
747  }
748 
749  added = true;
750  }
751  } else if (keyMatch && xkeyMatch) {
752  removed = true;
753  }
754  } // proxy loop
755  if (!added && !removed) {
756  ATH_MSG_DEBUG(std::format(" No object matching {},\"{}\" found", item_id, item_key));
757  } else if (removed) {
758  ATH_MSG_DEBUG(std::format(" Object being excluded based on property setting {},\"{}\". Skipping",
759  item_id, item_key));
760  }
761  } else {
762  ATH_MSG_DEBUG(std::format(" Failed to receive proxy iterators from StoreGate for {},\"{}\". Skipping",
763  item_id, item_key));
764  }
765  return StatusCode::SUCCESS;
766 }
767 
773 std::set<std::string>
774 AthenaOutputStream::buildCompressionSet (const ToolHandle<SG::IFolder>& handle,
775  const CLID& item_id,
776  const std::string& item_key) const
777 {
778  // Create an empty result
779  std::set<std::string> result;
780 
781  // Check the item is indeed Aux.
782  if(item_key.find("Aux.") == std::string::npos) {
783  return result;
784  }
785 
786  // First the high compression list
787  for (const auto& iter : *handle) {
788  // First match the IDs for early rejection.
789  if (iter.id() != item_id) {
790  continue;
791  }
792  // Then find the compression item key and the compression list string
793  size_t seppos = iter.key().find('.');
794  std::string comp_item_key{""}, comp_str{""};
795  if(seppos != std::string::npos) {
796  comp_item_key = iter.key().substr(0, seppos+1);
797  comp_str = iter.key().substr(seppos+1);
798  } else {
799  comp_item_key = iter.key();
800  }
801  // Proceed only if the keys match and the
802  // compression list string is not empty
803  if (!comp_str.empty() && comp_item_key == item_key) {
804  std::stringstream ss(comp_str);
805  std::string attr;
806  while( std::getline(ss, attr, '.') ) {
807  result.insert(attr);
808  }
809  }
810  }
811 
812  // All done, return the result
813  return result;
814 }
815 
818  SG::DataProxy& itemProxy,
819  const std::string& aux_attr,
820  SG::SelectionVetoes& vetoes) const
821 {
822  // Collect dynamic Aux selection (parse the line, attributes separated by dot)
823  std::set<std::string> attributes;
824  if( aux_attr.size() ) {
825  std::stringstream ss(aux_attr);
826  std::string attr;
827  while( std::getline(ss, attr, '.') ) {
828  attributes.insert(attr);
829  }
830  }
831 
832  // Return early if there's no selection.
833  if (attributes.empty()) {
834  return;
835  }
836 
837  std::string key = itemProxy.name();
838  if (key.size() >= 4 && key.compare (key.size()-4, 4, "Aux.")==0)
839  {
840  key.erase (key.size()-4, 4);
841  }
842 
843  // Find the entry for the selection.
844  SG::auxid_set_t& vset = vetoes[key];
845 
846  // Form the veto mask for this object.
848  sel.selectAux (attributes);
849 
850  // Get all the AuxIDs that we know of and the selected ones
851  SG::auxid_set_t all = auxstore.getAuxIDs();
852  SG::auxid_set_t selected = sel.getSelectedAuxIDs( all );
853 
854  // Loop over all and build a list of vetoed AuxIDs from non selected ones
855  for( const SG::auxid_t auxid : all ) {
856  if ( !selected.test( auxid ) ) {
857  vset.insert( auxid );
858  }
859  }
860 }
861 
862 // Here comes the list handlers...
863 void AthenaOutputStream::itemListHandler(Gaudi::Details::PropertyBase& /* theProp */) {
864  // Assuming concrete SG::Folder also has an itemList property
865  IProperty *pAsIProp(nullptr);
866  if ((m_p2BWritten.retrieve()).isFailure() ||
867  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
868  (pAsIProp->setProperty(m_itemList)).isFailure()) {
869  throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
870  }
871 }
872 
873 void AthenaOutputStream::compressionListHandlerHigh(Gaudi::Details::PropertyBase& /* theProp */) {
874  IProperty *pAsIProp(nullptr);
875  if ((m_compressionDecoderHigh.retrieve()).isFailure() ||
876  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderHigh)) ||
877  (pAsIProp->setProperty("ItemList", m_compressionListHigh.toString())).isFailure()) {
878  throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
879  }
880 }
881 
882 void AthenaOutputStream::compressionListHandlerLow(Gaudi::Details::PropertyBase& /* theProp */) {
883  IProperty *pAsIProp(nullptr);
884  if ((m_compressionDecoderLow.retrieve()).isFailure() ||
885  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderLow)) ||
886  (pAsIProp->setProperty("ItemList", m_compressionListLow.toString())).isFailure()) {
887  throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
888  }
889 }
890 
891 // Reinitialize the internal state of the algorithm
892 // for I/O purposes (e.g. upon @c fork(2))
894  ATH_MSG_INFO("I/O reinitialization...");
895  m_incidentSvc->removeListener(this, "MetaDataStop"); // Remove any existing listener to avoid handling the incident multiple times
896  m_incidentSvc->addListener(this, "MetaDataStop", 50);
897  for (auto& tool : m_helperTools) {
898  if (!tool->postInitialize().isSuccess()) {
899  ATH_MSG_ERROR("Cannot initialize helper tool");
900  }
901  }
902  return StatusCode::SUCCESS;
903 }
904 
905 // Finalize the internal state of the algorithm
906 // for I/O purposes (e.g. upon @c fork(2))
908  ATH_MSG_INFO("I/O finalization...");
909  for (auto& tool : m_helperTools) {
910  if (!tool->preFinalize().isSuccess()) {
911  ATH_MSG_ERROR("Cannot finalize helper tool");
912  }
913  }
914  const Incident metaDataStopIncident(name(), "MetaDataStop");
915  this->handle(metaDataStopIncident);
916  m_incidentSvc->removeListener(this, "MetaDataStop");
917  if (m_dataStore->clearStore().isFailure()) {
918  ATH_MSG_WARNING("Cannot clear the DataStore");
919  }
920  return StatusCode::SUCCESS;
921 }
922 
929 {
930  m_dictLoader->load_type (clid);
931 
932  // Also load the persistent class dictionary, if applicable.
933  std::unique_ptr<ITPCnvBase> tpcnv = m_tpCnvSvc->t2p_cnv_unique (clid);
934  if (tpcnv) {
935  m_dictLoader->load_type (tpcnv->persistentTInfo());
936  }
937 }
938 
940 bool AthenaOutputStream::simpleMatch(const std::string& pattern,
941  const std::string& text) {
942  size_t pi = 0, ti = 0, star = std::string::npos, match = 0;
943  while (ti < text.size()) {
944  if (pi < pattern.size() && (pattern[pi] == text[ti] || pattern[pi] == '*')) {
945  if (pattern[pi] == '*') { star = pi++; match = ti; }
946  else { ++pi; ++ti; }
947  } else if (star != std::string::npos) {
948  pi = star + 1;
949  ti = ++match;
950  } else return false;
951  }
952  while (pi < pattern.size() && pattern[pi] == '*') ++pi;
953  return pi == pattern.size();
954 }
mergePhysValFiles.pattern
pattern
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:25
AthenaOutputStream::finalize
virtual StatusCode finalize() override
Definition: AthenaOutputStream.cxx:378
AthenaOutputStream::m_rangeIDforRangeFN
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated
Definition: AthenaOutputStream.h:199
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
AthenaOutputStream::writeMetaData
void writeMetaData(const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)
Definition: AthenaOutputStream.cxx:318
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
AthenaOutputStream::m_outputAttributes
std::string m_outputAttributes
Output attributes.
Definition: AthenaOutputStream.h:231
SGIFolder.h
get_generator_info.result
result
Definition: get_generator_info.py:21
RootAuxDynIO::AUX_POSTFIX
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
Definition: RootAuxDynDefs.h:12
PowhegControl_ttHplus_NLO.ss
ss
Definition: PowhegControl_ttHplus_NLO.py:83
LArConditions2Ntuple.objects
objects
Definition: LArConditions2Ntuple.py:64
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
AthenaOutputStream::addItemObjects
StatusCode addItemObjects(const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
Definition: AthenaOutputStream.cxx:567
DataBucketBase
A non-templated base class for DataBucket, allows to access the transient object address as a void*.
Definition: DataBucketBase.h:24
SG::isTransientKey
bool isTransientKey(const std::string &key)
Test to see if a key is transoent.
Definition: transientKey.h:41
SG::fromStorable
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
Definition: StorableConversions.h:169
AthenaOutputStream::m_itemList
StringArrayProperty m_itemList
Vector of item names.
Definition: AthenaOutputStream.h:115
AthenaOutputStream::buildCompressionSet
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
Definition: AthenaOutputStream.cxx:774
AthenaOutputStream::m_dataStore
ServiceHandle< StoreGateSvc > m_dataStore
Handle to the StoreGateSvc store where the data we want to write out resides.
Definition: AthenaOutputStream.h:99
AthenaOutputStream::m_compressionListLow
StringArrayProperty m_compressionListLow
Vector of item names.
Definition: AthenaOutputStream.h:128
DataBucketBase::object
virtual void * object()=0
SG::SelectionVetoes
std::unordered_map< std::string, SG::auxid_set_t > SelectionVetoes
Map of vetoed variables.
Definition: SelectionVetoes.h:50
IAthenaOutputStreamTool
This is a tool that allows streaming out of DataObjects. This has been factorized out from AthenaOutp...
Definition: IAthenaOutputStreamTool.h:69
skel.it
it
Definition: skel.GENtoEVGEN.py:407
pool::WRITE
@ WRITE
Definition: Database/APR/StorageSvc/StorageSvc/pool.h:72
SG::DataProxy::accessData
DataObject * accessData()
Access DataObject on-demand using conversion service.
CLIDRegistry::CLIDToTypeinfo
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
Definition: CLIDRegistry.cxx:136
AthenaOutputStream::initialize
virtual StatusCode initialize() override
Definition: AthenaOutputStream.cxx:73
AthenaOutputStream::m_forceRead
BooleanProperty m_forceRead
set to true to force read of data objects in item list
Definition: AthenaOutputStream.h:147
SG::DataProxy::isValid
bool isValid() const
called by destructor
python.RatesEmulationExample.lock
lock
Definition: RatesEmulationExample.py:148
AthenaOutputStream::loadDict
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
Definition: AthenaOutputStream.cxx:928
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
AthenaOutputStream.h
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
FilteredAlgorithm::initialize
virtual StatusCode initialize()
Definition: FilteredAlgorithm.cxx:31
ITPCnvBase.h
DataBucketBase::cast
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
AthenaOutputStream::m_transientItems
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
Definition: AthenaOutputStream.h:138
AthenaOutputStream::m_objects
IDataSelector m_objects
Collection of objects being selected.
Definition: AthenaOutputStream.h:172
AthenaOutputStream::m_metadataItemList
StringArrayProperty m_metadataItemList
Vector of item names.
Definition: AthenaOutputStream.h:118
xAOD::AuxCompression
Definition: AuxCompression.h:20
AthenaOutputStream::m_currentStore
ServiceHandle< StoreGateSvc > * m_currentStore
Definition: AthenaOutputStream.h:101
AthenaPoolTestWrite.stream
string stream
Definition: AthenaPoolTestWrite.py:12
Trk::u
@ u
Enums for curvilinear frames.
Definition: ParamDefs.h:77
transientKey.h
ProxyMap.h
pi
#define pi
Definition: TileMuonFitter.cxx:65
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:92
SG::ProxyMap
std::map< std::string, DataProxy * > ProxyMap
Definition: ProxyMap.h:24
AthenaOutputStream::collectAllObjects
StatusCode collectAllObjects()
Collect data objects for output streamer list.
Definition: AthenaOutputStream.cxx:533
SG::makeHandle
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
Definition: ReadCondHandle.h:274
MetaContBase
Definition: MetaCont.h:24
AthenaOutputStream::m_compressionListHigh
StringArrayProperty m_compressionListHigh
Vector of item names.
Definition: AthenaOutputStream.h:125
PyPoolBrowser.item
item
Definition: PyPoolBrowser.py:129
AthenaOutputStream::m_streamer
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
Definition: AthenaOutputStream.h:183
AthenaOutputStream::m_outSeqSvc
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
Definition: AthenaOutputStream.h:109
WriteHandle.h
Handle class for recording to StoreGate.
SG::auxid_t
size_t auxid_t
Identifier for a particular aux data item.
Definition: AuxTypes.h:27
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
MetaDataSvc::ToolLockGuard
Definition: MetaDataSvc.h:243
AthenaOutputStream::AthenaOutputStream
AthenaOutputStream(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
Definition: AthenaOutputStream.cxx:43
AthenaOutputStream::m_compressionBitsHigh
UnsignedIntegerProperty m_compressionBitsHigh
Number of mantissa bits in the float compression.
Definition: AthenaOutputStream.h:131
RootAuxDynDefs.h
lumiFormat.i
int i
Definition: lumiFormat.py:85
CxxUtils::ConcurrentBitset::insert
ConcurrentBitset & insert(bit_t bit, bit_t new_nbits=0)
Set a bit to 1.
AthenaOutputStream::m_slotRangeMap
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
Definition: AthenaOutputStream.h:196
IAuxStoreIO.h
Interface providing I/O for a generic auxiliary store.
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
python.getProblemFolderFromLogs.st
st
Definition: getProblemFolderFromLogs.py:68
sel
sel
Definition: SUSYToolsTester.cxx:92
AthenaOutputStream::m_outputName
StringProperty m_outputName
Name of the output file.
Definition: AthenaOutputStream.h:141
AthenaOutputStream::compressionListHandlerLow
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:882
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:37
calibdata.exception
exception
Definition: calibdata.py:495
IAthenaOutputStreamTool::connectOutput
virtual StatusCode connectOutput(const std::string &outputName="")=0
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
LHEF::Reader
Pythia8::Reader Reader
Definition: Prophecy4fMerger.cxx:11
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
IAthenaOutputStreamTool::connectServices
virtual StatusCode connectServices(const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord=false)=0
Specify which data store and conversion service to use and whether to extend provenence Only use if o...
python.getProblemFolderFromLogs.el
dictionary el
Definition: getProblemFolderFromLogs.py:48
AthenaOutputStream::m_helperTools
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
Definition: AthenaOutputStream.h:186
AthenaOutputStream::m_ownedObjects
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
Definition: AthenaOutputStream.h:180
SG::VarHandleKey::initialize
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:103
AthenaOutputStream::compressionListHandlerHigh
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:873
DataHeader.h
This file contains the class definition for the DataHeader and DataHeaderElement classes.
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
AthenaOutputStream::io_finalize
virtual StatusCode io_finalize() override
Definition: AthenaOutputStream.cxx:907
AthenaOutputStream::m_persName
StringProperty m_persName
Name of the persistency service capable to write data from the store.
Definition: AthenaOutputStream.h:144
SG::DataProxy::clID
CLID clID() const
Retrieve clid.
AthenaOutputStream::m_writeMetadataAndDisconnect
bool m_writeMetadataAndDisconnect
Definition: AthenaOutputStream.h:189
AthenaOutputStream::m_extendProvenanceRecord
BooleanProperty m_extendProvenanceRecord
Set to false to omit adding the current DataHeader into the DataHeader history This will cause the in...
Definition: AthenaOutputStream.h:151
AthenaOutputStream::m_itemListFromTool
BooleanProperty m_itemListFromTool
Set to write out everything in input DataHeader.
Definition: AthenaOutputStream.h:154
AthenaOutputStream::m_transient
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
Definition: AthenaOutputStream.h:166
AthenaOutputStream::finalizeRange
void finalizeRange(const std::string &rangeFN)
Definition: AthenaOutputStream.cxx:298
AthenaOutputStream::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: AthenaOutputStream.h:107
SG::CompressionInfo
std::unordered_map< std::string, SG::ThinningInfo::compression_map_t > CompressionInfo
Map of compressed variables and their compression levels.
Definition: CompressionInfo.h:37
AthenaOutputStream::itemListHandler
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:863
xAOD::AuxSelection
Class helping in dealing with dynamic branch selection.
Definition: AuxSelection.h:31
AthenaOutputStream::handleVariableSelection
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Here we build the vetoed AuxIDs.
Definition: AthenaOutputStream.cxx:817
id
SG::auxid_t id
Definition: Control/AthContainers/Root/debug.cxx:239
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
AuxCompression.h
AthenaOutputStream::m_metadataStore
ServiceHandle< StoreGateSvc > m_metadataStore
Definition: AthenaOutputStream.h:100
AtlCoolConsole.tool
tool
Definition: AtlCoolConsole.py:452
AthenaOutputStream::io_reinit
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
Definition: AthenaOutputStream.cxx:893
AthenaOutputStream::m_tpCnvSvc
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
Definition: AthenaOutputStream.h:106
SG::DataProxy::name
virtual const name_type & name() const override final
Retrieve data object key == string.
AthenaOutputStream::m_streamName
StringProperty m_streamName
Stream name (defaults to algorithm name)
Definition: AthenaOutputStream.h:112
AthenaOutputStream::m_compressionBitsLow
UnsignedIntegerProperty m_compressionBitsLow
Number of mantissa bits in the float compression.
Definition: AthenaOutputStream.h:134
AthenaOutputStream::m_altObjects
IDataSelector m_altObjects
Objects overridden by ‘exact’ handling.
Definition: AthenaOutputStream.h:175
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:16
IAthenaOutputStreamTool::commitOutput
virtual StatusCode commitOutput(bool doCommit=false)=0
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
AthenaOutputStream::m_dictLoader
ServiceHandle< IDictLoaderSvc > m_dictLoader
Definition: AthenaOutputStream.h:105
AthenaOutputStream::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Handles to all the necessary services.
Definition: AthenaOutputStream.h:104
CLIDRegistry.h
a static registry of CLID->typeName entries. NOT for general use. Use ClassIDSvc instead.
AuxSelection.h
SG::DataProxy::hasAlias
bool hasAlias(const std::string &key) const
Test to see if a given string is in the alias set.
AthenaOutputStream::m_events
std::atomic< int > m_events
Number of events written to this output stream.
Definition: AthenaOutputStream.h:192
AthenaOutputStream::simpleMatch
bool simpleMatch(const std::string &pattern, const std::string &text)
Glob-style matcher, where the only meta-character is '*'.
Definition: AthenaOutputStream.cxx:940
AthenaOutputStream::m_compressionDecoderHigh
ToolHandle< SG::IFolder > m_compressionDecoderHigh
The top-level folder with items to be compressed high.
Definition: AthenaOutputStream.h:160
AthenaOutputStream::m_mutex
mutex_t m_mutex
mutex for this Stream write() and handle() methods
Definition: AthenaOutputStream.h:205
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
ITPCnvBase::persistentTInfo
virtual const std::type_info & persistentTInfo() const =0
return C++ type id of the persistent class this converter is for
AltDataBucket.h
AthenaOutputStream::execute
virtual StatusCode execute() override
Definition: AthenaOutputStream.cxx:406
TransientAddress.h
AthenaOutputStream::m_streamerMap
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT
Definition: AthenaOutputStream.h:202
IAuxStore.h
Interface for non-const operations on an auxiliary store.
makeTransCanvas.text
text
Definition: makeTransCanvas.py:11
SG::auxid_set_t
A set of aux data identifiers.
Definition: AuxTypes.h:47
merge.status
status
Definition: merge.py:16
StoreID::EVENT_STORE
@ EVENT_STORE
Definition: StoreID.h:26
AthenaOutputStream::write
virtual StatusCode write()
Stream the data.
Definition: AthenaOutputStream.cxx:442
AthenaOutputStream::m_pCLIDSvc
ServiceHandle< IClassIDSvc > m_pCLIDSvc
Definition: AthenaOutputStream.h:108
AthenaOutputStream::m_selVetoesKey
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
Definition: AthenaOutputStream.h:223
IConstAuxStore.h
Interface for const operations on an auxiliary store.
SG::IConstAuxStore
Interface for const operations on an auxiliary store.
Definition: IConstAuxStore.h:64
GetLBsToIgnore.outputFN
outputFN
Definition: GetLBsToIgnore.py:224
python.PyAthena.obj
obj
Definition: PyAthena.py:132
MetaContBase::getAsVoid
virtual void * getAsVoid(const SourceID &sid) const =0
IAthenaOutputStreamTool::streamObjects
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="")=0
SG::DataProxy
Definition: DataProxy.h:45
Cut::all
@ all
Definition: SUSYToolsAlg.cxx:67
StoreGateSvc.h
SG::FolderItem
a Folder item (data object) is identified by the clid/key pair
Definition: SGFolderItem.h:24
python.TransformConfig.attributes
def attributes(self)
Definition: TransformConfig.py:383
SG::IConstAuxStore::getAuxIDs
virtual const SG::auxid_set_t & getAuxIDs() const =0
Return a set of identifiers for existing data items in this store.
SG::ConstProxyIterator
ProxyMap::const_iterator ConstProxyIterator
Definition: ProxyMap.h:28
match
bool match(std::string s1, std::string s2)
match the individual directories of two strings
Definition: hcg.cxx:356
AthenaOutputStream::handle
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.
Definition: AthenaOutputStream.cxx:223
AthenaOutputStream::m_p2BWritten
ToolHandle< SG::IFolder > m_p2BWritten
The top-level folder with items to be written.
Definition: AthenaOutputStream.h:157
fitman.k
k
Definition: fitman.py:528
AthenaOutputStream::clearSelection
void clearSelection()
Clear list of selected objects.
Definition: AthenaOutputStream.cxx:526
python.BeamSpotUpdate.compression
compression
Definition: BeamSpotUpdate.py:188
AthenaOutputStream::m_compressionDecoderLow
ToolHandle< SG::IFolder > m_compressionDecoderLow
The top-level folder with items to be compressed low.
Definition: AthenaOutputStream.h:163
ServiceHandle< IIoComponentMgr >
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
AthenaOutputStream::~AthenaOutputStream
virtual ~AthenaOutputStream()
Standard Destructor.
Definition: AthenaOutputStream.cxx:67
CxxUtils::ConcurrentBitset::test
bool test(bit_t bit) const
Test to see if a bit is set.
AthenaOutputStream::m_compInfoKey
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.
Definition: AthenaOutputStream.h:228
DataProxy.h