ATLAS Offline Software
Public Types | Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Attributes | List of all members
AthenaOutputStream Class Reference

algorithm that marks for write data objects in SG More...

#include <AthenaOutputStream.h>

Inheritance diagram for AthenaOutputStream:
Collaboration diagram for AthenaOutputStream:

Public Types

typedef std::vector< SG::DataProxy * > Items
 
typedef std::vector< std::pair< std::string, std::string > > TypeKeyPairs
 

Public Member Functions

 AthenaOutputStream (const std::string &name, ISvcLocator *pSvcLocator)
 Standard algorithm Constructor. More...
 
virtual ~AthenaOutputStream ()
 Standard Destructor. More...
 

Protected Types

typedef ServiceHandle< IClassIDSvc > IClassIDSvc_t
 
typedef std::map< std::string, unsigned int > CounterMapType
 map to record number of writes per object More...
 
typedef std::recursive_mutex mutex_t
 mutex for this Stream write() and handle() methods More...
 

Protected Member Functions

void itemListHandler (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property. More...
 
void excludeListHandler (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property. More...
 
void compressionListHandlerHigh (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property. More...
 
void compressionListHandlerLow (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property. More...
 

Protected Attributes

ServiceHandle< StoreGateSvcm_dataStore
 handle to the StoreGateSvc store where the data we want to write out resides More...
 
ServiceHandle< StoreGateSvcm_metadataStore
 
ServiceHandle< StoreGateSvc > * m_currentStore
 
ServiceHandle< IItemListSvcm_itemSvc
 
ServiceHandle< MetaDataSvcm_metaDataSvc
 
ServiceHandle< IDictLoaderSvcm_dictLoader
 
ServiceHandle< ITPCnvSvcm_tpCnvSvc
 
ServiceHandle< IIncidentSvc > m_incidentSvc
 
std::string m_persName
 Name of the persistency service capable to write data from the store. More...
 
StringProperty m_writingTool
 Name of the OutputStreamTool used for writing. More...
 
std::string m_outputName
 Name of the output file. More...
 
std::string m_outputAttributes
 
IClassIDSvc_t m_pCLIDSvc
 
ServiceHandle< OutputStreamSequencerSvcm_outSeqSvc
 
StringProperty m_streamName {this, "StreamName", "", "name of the output stream"}
 Stream name (defaults to algorithm name) More...
 
StringArrayProperty m_itemList {this,"ItemList",{},"List of items to write","OutputStreamItemList"}
 Vector of item names. More...
 
StringArrayProperty m_metadataItemList {this,"MetadataItemList",{},"List of metadata items to write","OutputStreamItemList"}
 Vector of item names. More...
 
StringArrayProperty m_excludeList {this,"ExcludeList",{},"List of metadata items to write","OrderedSet<std::string>"}
 Vector of item names. More...
 
StringArrayProperty m_compressionListHigh
 Vector of item names. More...
 
StringArrayProperty m_compressionListLow
 Vector of item names. More...
 
unsigned int m_compressionBitsHigh
 Number of mantissa bits in the float compression. More...
 
unsigned int m_compressionBitsLow
 Number of mantissa bits in the float compression. More...
 
StringArrayProperty m_transientItems
 List of items that are known to be present in the transient store (and hence we can make input dependencies on them). More...
 
ToolHandle< SG::IFolderm_p2BWritten
 the top-level folder with items to be written More...
 
ToolHandle< SG::IFolderm_decoder
 the top-level folder with items to be written More...
 
ToolHandle< SG::IFolderm_compressionDecoderHigh
 the top-level folder with items to be compressed high More...
 
ToolHandle< SG::IFolderm_compressionDecoderLow
 the top-level folder with items to be compressed low More...
 
ToolHandle< SG::IFolderm_transient
 Decoded list of transient ids. More...
 
std::multimap< CLID, std::string > m_CLIDKeyPairs
 map of (clid,key) pairs to be excluded (comes from m_excludeList) More...
 
IDataSelector m_objects
 Collection of objects being selected. More...
 
IDataSelector m_altObjects
 Objects overridden by ‘exact’ handling. More...
 
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
 Collection of DataObject instances owned by this service. More...
 
std::atomic< int > m_events
 Number of events written to this output stream. More...
 
bool m_forceRead
 set to true to force read of data objects in item list More...
 
bool m_extendProvenanceRecord
 set to false to omit adding the current DataHeader into the DataHeader history this will cause the input file to be neglected for back navigation (replace mode). More...
 
bool m_writeOnExecute
 set to true to trigger streaming of data on execute() More...
 
bool m_writeOnFinalize
 set to true to trigger streaming of data on finalize() More...
 
bool m_itemListFromTool
 set to write out everything in input DataHeader More...
 
bool m_checkNumberOfWrites
 set to true to check for number of times each object is written More...
 
CounterMapType m_objectWriteCounter
 
ToolHandle< IAthenaOutputStreamToolm_streamer
 pointer to AthenaOutputStreamTool More...
 
ToolHandleArray< IAthenaOutputToolm_helperTools
 vector of AlgTools that that are executed by this stream More...
 
bool m_writeMetadataAndDisconnect = false
 
std::map< unsigned, std::string > m_slotRangeMap
 map of filenames assigned to active slots More...
 
std::map< std::string, std::string > m_rangeIDforRangeFN
 map of RangeIDs (as used by the Sequencer) for each Range filename generated More...
 
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
 map of streamerTools handling event ranges in MT More...
 
mutex_t m_mutex
 

Private Attributes

SG::WriteHandleKey< SG::SelectionVetoesm_selVetoesKey { this, "SelVetoesKey", "" }
 Key used for recording selected dynamic variable information to the event store. More...
 
SG::WriteHandleKey< SG::CompressionInfom_compInfoKey { this, "CompInfoKey", "" }
 Key used for recording lossy float compressed variable information to the event store. More...
 

implement IAlgorithm

virtual StatusCode initialize () override
 
virtual StatusCode finalize () override
 
virtual StatusCode execute () override
 
virtual StatusCode stop () override
 
virtual StatusCode write ()
 Stream the data. More...
 
void clearSelection ()
 Clear list of selected objects. More...
 
StatusCode collectAllObjects ()
 Collect data objects for output streamer list. More...
 
IDataSelector * selectedObjects ()
 Return the list of selected objects. More...
 
virtual void handle (const Incident &incident) override
 Incident service handle listening for MetaDataStop. More...
 
virtual StatusCode io_reinit () override
 Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2)) More...
 
virtual StatusCode io_finalize () override
 
StatusCode addItemObjects (const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
 Add item data objects to output streamer list. More...
 
void handleVariableSelection (const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &tns, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
 
void tokenizeAtSep (std::vector< std::string > &, const std::string &, const std::string &) const
 tokenize a string based on a substring More...
 
bool matchKey (const std::vector< std::string > &key, const std::string &proxyName) const
 Try to match a DataProxy name to a vector of strings. More...
 
void writeMetaData (const std::string &outputFN="")
 Write MetaData for this stream (by default) or for a substream outputFN (in ES mode) More...
 
std::set< std::string > buildCompressionSet (const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
 Helper function for building the compression lists. More...
 
void finalizeRange (const std::string &rangeFN)
 
void loadDict (CLID clid)
 Helper function to load dictionaries (both transient and persistent) for a given type. More...
 

Detailed Description

algorithm that marks for write data objects in SG

Author
srini.nosp@m.r@bn.nosp@m.l.gov

Definition at line 54 of file AthenaOutputStream.h.

Member Typedef Documentation

◆ CounterMapType

typedef std::map<std::string, unsigned int> AthenaOutputStream::CounterMapType
protected

map to record number of writes per object

Definition at line 142 of file AthenaOutputStream.h.

◆ IClassIDSvc_t

typedef ServiceHandle<IClassIDSvc> AthenaOutputStream::IClassIDSvc_t
protected

Definition at line 83 of file AthenaOutputStream.h.

◆ Items

Definition at line 58 of file AthenaOutputStream.h.

◆ mutex_t

typedef std::recursive_mutex AthenaOutputStream::mutex_t
protected

mutex for this Stream write() and handle() methods

Definition at line 163 of file AthenaOutputStream.h.

◆ TypeKeyPairs

typedef std::vector<std::pair<std::string, std::string> > AthenaOutputStream::TypeKeyPairs

Definition at line 188 of file AthenaOutputStream.h.

Constructor & Destructor Documentation

◆ AthenaOutputStream()

AthenaOutputStream::AthenaOutputStream ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Standard algorithm Constructor.

Definition at line 156 of file AthenaOutputStream.cxx.

157  : base_class(name, pSvcLocator),
158  m_dataStore("StoreGateSvc", name),
159  m_metadataStore("MetaDataStore", name),
161  m_itemSvc("ItemListSvc", name),
162  m_metaDataSvc("MetaDataSvc", name),
163  m_dictLoader("AthDictLoaderSvc", name),
164  m_tpCnvSvc("AthTPCnvSvc", name),
165  m_incidentSvc("IncidentSvc", name),
167  m_pCLIDSvc("ClassIDSvc", name),
168  m_outSeqSvc("OutputStreamSequencerSvc", name),
169  m_p2BWritten(string("SG::Folder/") + name + string("_TopFolder"), this),
170  m_decoder(string("SG::Folder/") + name + string("_excluded"), this),
171  m_compressionDecoderHigh(string("SG::Folder/") + name + string("_compressed_high"), this),
172  m_compressionDecoderLow(string("SG::Folder/") + name + string("_compressed_low"), this),
173  m_transient(string("SG::Folder/") + name + string("_transient"), this),
174  m_events(0),
175  m_streamer(string("AthenaOutputStreamTool/") + name + string("Tool"), this),
176  m_helperTools(this)
177 {
178  assert(pSvcLocator);
179  declareProperty("TransientItems", m_transientItems);
180  declareProperty("OutputFile", m_outputName="DidNotNameOutput.root");
181  declareProperty("EvtConversionSvc", m_persName="EventPersistencySvc");
182  declareProperty("WritingTool", m_streamer);
183  declareProperty("Store", m_dataStore);
184  declareProperty("MetadataStore", m_metadataStore);
185  declareProperty("ForceRead", m_forceRead=true);
186  declareProperty("ExtendProvenanceRecord", m_extendProvenanceRecord=true);
187  declareProperty("WriteOnExecute", m_writeOnExecute=true);
188  declareProperty("WriteOnFinalize", m_writeOnFinalize=false);
189  declareProperty("TakeItemsFromInput", m_itemListFromTool=false);
190  declareProperty("CheckNumberOfWrites", m_checkNumberOfWrites=true);
191  declareProperty("HelperTools", m_helperTools);
192  declareProperty("CompressionListHigh", m_compressionListHigh);
193  declareProperty("CompressionListLow", m_compressionListLow);
194  declareProperty("CompressionBitsHigh", m_compressionBitsHigh = 7);
195  declareProperty("CompressionBitsLow", m_compressionBitsLow = 15);
196 
197  // Associate action handlers with the AcceptAlgs,
198  // RequireAlgs & VetoAlgs properties
199  m_itemList.declareUpdateHandler(&AthenaOutputStream::itemListHandler, this);
200  m_excludeList.declareUpdateHandler(&AthenaOutputStream::excludeListHandler, this);
203 }

◆ ~AthenaOutputStream()

AthenaOutputStream::~AthenaOutputStream ( )
virtual

Standard Destructor.

Definition at line 206 of file AthenaOutputStream.cxx.

206  {
207  m_streamerMap.clear();
208 }

Member Function Documentation

◆ addItemObjects()

StatusCode AthenaOutputStream::addItemObjects ( const SG::FolderItem item,
SG::SelectionVetoes vetoes,
SG::CompressionInfo compInfo 
)
private

Add item data objects to output streamer list.

Handle variable selections. Both variable selection and lossy float compression are limited to event data for the time being

Definition at line 769 of file AthenaOutputStream.cxx.

772 {
773  // anything after a dot is a list of dynamic Aux attributes, separated by dots
774  size_t dotpos = item.key().find('.');
775  string item_key, aux_attr;
776  if( dotpos != string::npos ) {
777  item_key = item.key().substr(0, dotpos+1);
778  aux_attr = item.key().substr(dotpos+1);
779  } else {
780  item_key = item.key();
781  }
782  CLID item_id = item.id();
783  ATH_MSG_DEBUG("addItemObjects(" << item_id << ",\"" << item_key << "\") called");
784  ATH_MSG_DEBUG(" Key:" << item_key );
785  if( aux_attr.size() ) {
786  ATH_MSG_DEBUG(" Aux Attr:" << aux_attr );
787  }
788  static const std::string wildCard = "*";
789  std::set<std::string> clidKeys;
790  for (SG::IFolder::const_iterator iter = m_decoder->begin(), iterEnd = m_decoder->end();
791  iter != iterEnd; ++iter) {
792  if (iter->id() == item_id) {
793  clidKeys.insert(iter->key());
794  }
795  }
796 
797  // Here we build the list of attributes for the lossy float compression
798  // Note that we do not allow m_compressionBitsHigh >= m_compressionBitsLow
799  // Otherwise is, in any case, a logical error and they'd potentially overwrite each other
800  std::map< unsigned int, std::set< std::string > > comp_attr_map;
801  comp_attr_map[ m_compressionBitsHigh ] = buildCompressionSet( m_compressionDecoderHigh, item_id, item_key );
802  comp_attr_map[ m_compressionBitsLow ] = buildCompressionSet( m_compressionDecoderLow, item_id, item_key );
803 
804  // Print some debugging information regarding the lossy float compression configuration
805  for( const auto& it : comp_attr_map ) {
806  ATH_MSG_DEBUG(" Comp Attr " << it.second.size() << " with " << it.first << " mantissa bits.");
807  if ( it.second.size() > 0 ) {
808  for( const auto& attr : it.second ) {
809  ATH_MSG_DEBUG(" >> " << attr);
810  }
811  }
812  }
813 
814  // For MetaData objects of type T that are kept in MetaContainers get the MetaCont<T> ID
815  const CLID remapped_item_id = m_metaDataSvc->remapMetaContCLID( item_id );
817  SG::ProxyMap map;
818  bool gotProxies = false;
819  // Look for the clid in storegate
820  SG::DataProxy* match = (*m_currentStore)->proxy(remapped_item_id, item_key, true);
821  if (match != nullptr) {
822  map.insert({item_key, match});
823  iter = map.begin();
824  end = map.end();
825  gotProxies = true;
826  }
827  // Look for the clid in storegate
828  if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter, end)).isSuccess()) {
829  gotProxies = true;
830  }
831  if (gotProxies) {
832  bool added = false, removed = false;
833  // For item list entry
834  // Check for wildcard within string, i.e. 'xxx*yyy', and save the matching parts
835  std::vector<std::string> keyTokens;
836  keyTokens.reserve(2);
837  std::vector<std::string> xkeyTokens;
838  xkeyTokens.reserve(2);
839  ATH_MSG_VERBOSE("Calling tokenizeAtStep( " << keyTokens << ", " << item_key << ", " << wildCard << ")" );
840  this->tokenizeAtSep( keyTokens, item_key, wildCard );
841  ATH_MSG_VERBOSE("Done calling tokenizeAtStep( " << keyTokens << ", " << item_key << ", " << wildCard << ")" );
842  // Now loop over any found proxies
843  for (; iter != end; ++iter) {
844  SG::DataProxy* itemProxy(iter->second);
845  string proxyName = itemProxy->name();
846  string stream;
847  if( m_currentStore == &m_metadataStore ) {
848  // only check metadata keys
849  stream = m_metaDataSvc->removeStreamFromKey(proxyName); // can modify proxyName
850  }
851  // Does this key match the proxy key name - allow for wildcarding and aliases
852  bool keyMatch = ( item_key == "*" ||
853  item_key == proxyName ||
854  itemProxy->hasAlias(item_key) );
855  if (!keyMatch) {
856  ATH_MSG_VERBOSE("Calling matchKey( " << keyTokens << ", " << proxyName << ")" );
857  keyMatch = matchKey(keyTokens, proxyName);
858  ATH_MSG_VERBOSE("Done calling matchKey( " << keyTokens << ", " << proxyName << ") with result: " << keyMatch );
859  }
860 
861  // Now undo the flag based on a similar analysis of excluded wildcard keys
862  bool xkeyMatch = false;
863  for (std::set<std::string>::const_iterator c2k_it = clidKeys.begin(), c2k_itEnd = clidKeys.end();
864  keyMatch && c2k_it != c2k_itEnd; ++c2k_it) {
865  if (*c2k_it == wildCard) {
866  xkeyMatch = true; // wildcard first
867  } else {
868  // Look for wildcard in key
869  std::string::size_type xsep = c2k_it->find(wildCard);
870  // If wildcard not found, then check whether the second is an excluded key
871  if (xsep == std::string::npos) {
872  if (*c2k_it == proxyName) {
873  xkeyMatch = true;
874  }
875  } else { // Otherwise take before and after wildcard for later use
876  this->tokenizeAtSep( xkeyTokens, *c2k_it, wildCard );
877  ATH_MSG_DEBUG("x Proxy name=" << proxyName );
878  xkeyMatch = matchKey(xkeyTokens, proxyName);
879  }
880  }
881  }
882  if( !stream.empty() and stream != m_outputName ) {
883  // reject keys that are marked for a different output stream
884  ATH_MSG_DEBUG("Rejecting key: " << itemProxy->name() << " in output: " << m_outputName);
885  xkeyMatch = true;
886  }
887  // All right, it passes key match find in itemList, but not in excludeList
888  if (keyMatch && !xkeyMatch) {
889  if (m_forceRead && itemProxy->isValid()) {
890  if (nullptr == itemProxy->accessData()) {
891  ATH_MSG_ERROR(" Could not get data object for id " << remapped_item_id << ",\"" << proxyName);
892  }
893  }
894  if (nullptr != itemProxy->object()) {
895  if( std::find(m_objects.begin(), m_objects.end(), itemProxy->object()) == m_objects.end() &&
896  std::find(m_altObjects.begin(), m_altObjects.end(), itemProxy->object()) == m_altObjects.end() )
897  {
898  if( item_id != remapped_item_id ) {
899  // For MetaCont<T>: -
900  // create a temporary DataObject for an entry in the container to pass to CnvSvc
901  DataBucketBase* dbb = static_cast<DataBucketBase*>( itemProxy->object() );
902  const MetaContBase* metaCont = static_cast<MetaContBase*>( dbb->cast( ClassID_traits<MetaContBase>::ID() ) );
903  void* obj = metaCont? metaCont->getAsVoid( m_outSeqSvc->currentRangeID() ) : nullptr;
904  if( obj ) {
905  auto altbucket = std::make_unique<AltDataBucket>(
906  obj, item_id, *CLIDRegistry::CLIDToTypeinfo(item_id), proxyName );
907  m_objects.push_back( altbucket.get() );
908  m_ownedObjects.push_back( std::move(altbucket) );
909  m_altObjects.push_back( itemProxy->object() ); // only for duplicate prevention
910  } else {
911  ATH_MSG_ERROR("Failed to retrieve object from MetaCont with key=" << item_key << ", for EventRangeID=" << m_outSeqSvc->currentRangeID() );
912  return StatusCode::FAILURE;
913  }
914  } else if (item.exact()) {
915  // If the exact flag is set, make a new DataObject
916  // holding the object as the requested type.
917  DataBucketBase* dbb = dynamic_cast<DataBucketBase*> (itemProxy->object());
918  if (!dbb) std::abort();
919  void* ptr = dbb->cast (item_id);
920  if (!ptr) {
921  // Hard cast
922  ptr = dbb->object();
923  }
924  auto altbucket =
925  std::make_unique<AltDataBucket>
926  (ptr, item_id,
927  *CLIDRegistry::CLIDToTypeinfo (item_id),
928  *itemProxy);
929  m_objects.push_back(altbucket.get());
930  m_ownedObjects.push_back (std::move(altbucket));
931  m_altObjects.push_back (itemProxy->object());
932  }
933  else
934  m_objects.push_back(itemProxy->object());
935  ATH_MSG_DEBUG(" Added object " << item_id << ",\"" << proxyName << "\"");
936  }
937 
938  // Build ItemListSvc string
939  std::string tn;
940  std::stringstream tns;
941  if (!m_pCLIDSvc->getTypeNameOfID(item_id, tn).isSuccess()) {
942  ATH_MSG_ERROR(" Could not get type name for id "
943  << item_id << ",\"" << proxyName);
944  tns << item_id << '_' << proxyName;
945  } else {
946  tn += '_' + proxyName;
947  tns << tn;
948  }
949 
953  if ((*m_currentStore)->storeID() == StoreID::EVENT_STORE &&
954  item_key.find( RootAuxDynIO::AUX_POSTFIX ) == ( item_key.size() - 4 )) {
955 
956  const SG::IConstAuxStore* auxstore( nullptr );
957  try {
958  SG::fromStorable( itemProxy->object(), auxstore, true );
959  } catch( const std::exception& ) {
960  ATH_MSG_DEBUG( "Error in casting object with CLID "
961  << itemProxy->clID() << " to SG::IConstAuxStore*" );
962  auxstore = nullptr;
963  }
964 
965  if (auxstore) {
966  handleVariableSelection (*auxstore, *itemProxy,
967  tns.str(), aux_attr,
968  vetoes);
969 
970  // Here comes the compression logic using ThinningInfo
971  // Get a hold of all AuxIDs for this store (static, dynamic etc.)
972  const SG::auxid_set_t allVars = auxstore->getAuxIDs();
973 
974  // Get a handle on the compression information for this store
975  std::string key = item_key;
976  key.erase (key.size()-4, 4);
977 
978  // Build the compression list, retrieve the relevant AuxIDs and
979  // store it in the relevant map that is going to be inserted into
980  // the ThinningCache later on by the ThinningCacheTool
982  compression.setCompressedAuxIDs( comp_attr_map );
983  for( const auto& it : compression.getCompressedAuxIDs( allVars ) ) {
984  if( it.second.size() > 0 ) { // insert only if the set is non-empty
985  compInfo[ key ][ it.first ] = it.second;
986  ATH_MSG_DEBUG( "Container " << key << " has " << it.second.size() <<
987  " variables that'll be lossy float compressed"
988  " with " << it.first << " mantissa bits" );
989  }
990  } // End of loop over variables to be lossy float compressed
991  } // End of lossy float compression logic
992 
993  }
994 
995  added = true;
996  if (m_checkNumberOfWrites) {
998  if (cit == m_objectWriteCounter.end()) {
999  // First time through
1000  m_objectWriteCounter.insert(CounterMapType::value_type(tn, 1));
1001  } else {
1002  // set to next iteration (to avoid double counting)
1003  // StreamTools will eliminate duplicates.
1004  (*cit).second = m_events + 1;
1005  }
1006  }
1007  if (m_itemSvc->addStreamItem(this->name(),tns.str()).isFailure()) {
1008  ATH_MSG_WARNING("Unable to record item " << tns.str() << " in Svc");
1009  }
1010  }
1011  } else if (keyMatch && xkeyMatch) {
1012  removed = true;
1013  }
1014  } // proxy loop
1015  if (!added && !removed) {
1016  ATH_MSG_DEBUG(" No object matching " << item_id << ",\"" << item_key << "\" found");
1017  } else if (removed) {
1018  ATH_MSG_DEBUG(" Object being excluded based on property setting "
1019  << item_id << ",\"" << item_key << "\". Skipping");
1020  }
1021  } else {
1022  ATH_MSG_DEBUG(" Failed to receive proxy iterators from StoreGate for "
1023  << item_id << ",\"" << item_key << "\". Skipping");
1024  }
1025  return StatusCode::SUCCESS;
1026 }

◆ buildCompressionSet()

std::set< std::string > AthenaOutputStream::buildCompressionSet ( const ToolHandle< SG::IFolder > &  handle,
const CLID item_id,
const std::string &  item_key 
) const
private

Helper function for building the compression lists.

Here we build the list of attributes for the float compression CompressionList follows the same logic as the ItemList We find the matching keys, read the string after "Aux.", tokenize by "." and build an std::set of these to be communicated to ThinningInfo elsewhere in the code.

Definition at line 1034 of file AthenaOutputStream.cxx.

1037 {
1038  // Create an empty result
1039  std::set<std::string> result;
1040 
1041  // Check the item is indeed Aux.
1042  if(item_key.find("Aux.") == string::npos) {
1043  return result;
1044  }
1045 
1046  // First the high compression list
1047  for (SG::IFolder::const_iterator iter = handle->begin(), iterEnd = handle->end();
1048  iter != iterEnd; ++iter) {
1049  // First match the IDs for early rejection.
1050  if (iter->id() != item_id) {
1051  continue;
1052  }
1053  // Then find the compression item key and the compression list string
1054  size_t seppos = iter->key().find('.');
1055  string comp_item_key{""}, comp_str{""};
1056  if(seppos != string::npos) {
1057  comp_item_key = iter->key().substr(0, seppos+1);
1058  comp_str = iter->key().substr(seppos+1);
1059  } else {
1060  comp_item_key = iter->key();
1061  }
1062  // Proceed only if the keys match and the
1063  // compression list string is not empty
1064  if (!comp_str.empty() && comp_item_key == item_key) {
1065  std::stringstream ss(comp_str);
1066  std::string attr;
1067  while( std::getline(ss, attr, '.') ) {
1068  result.insert(attr);
1069  }
1070  }
1071  }
1072 
1073  // All done, return the result
1074  return result;
1075 }

◆ clearSelection()

void AthenaOutputStream::clearSelection ( )

Clear list of selected objects.

Definition at line 701 of file AthenaOutputStream.cxx.

701  {
702  m_objects.erase(m_objects.begin(), m_objects.end());
703  m_ownedObjects.clear();
704  m_altObjects.clear();
705 }

◆ collectAllObjects()

StatusCode AthenaOutputStream::collectAllObjects ( )

Collect data objects for output streamer list.

Definition at line 707 of file AthenaOutputStream.cxx.

707  {
708  if (m_itemListFromTool) {
709  if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
710  ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
711  }
712  }
713 
714  auto vetoes = std::make_unique<SG::SelectionVetoes>();
715  auto compInfo = std::make_unique<SG::CompressionInfo>();
716 
717  m_p2BWritten->updateItemList(true);
718  std::vector<CLID> folderclids;
719  // Collect all objects that need to be persistified:
720  //FIXME refactor: move this in folder. Treat as composite
721  for (SG::IFolder::const_iterator i = m_p2BWritten->begin(), iEnd = m_p2BWritten->end(); i != iEnd; ++i) {
722  ATH_CHECK( addItemObjects(*i, *vetoes, *compInfo) );
723  folderclids.push_back(i->id());
724  }
725 
726  // FIXME This is a bruteforce hack to remove items erroneously
727  // added somewhere in the morass of the addItemObjects logic
728  IDataSelector prunedList;
729  for (auto it = m_objects.begin(); it != m_objects.end(); ++it) {
730  if (std::find(folderclids.begin(),folderclids.end(),(*it)->clID())!=folderclids.end()) {
731  if (SG::isTransientKey ((*it)->name())) {
732  ATH_MSG_ERROR("Request to write transient object key " <<
733  (*it)->name() << " ignored");
734  }
735  else {
736  prunedList.push_back(*it); // build new list that is correct
737  }
738  }
739  else {
740  ATH_MSG_DEBUG("Object " << (*it)->clID() <<","<< (*it)->name() << " found that was not in itemlist");
741  }
742  }
743  m_objects.clear(); // clear previous list
744  for (auto it = prunedList.begin(); it != prunedList.end(); ++it) {
745  if ((*it)->name().length() > 4 && (*it)->name().substr((*it)->name().length() - 4) == "Aux.") {
746  m_objects.push_back(*it); // first copy aux store new into previous
747  }
748  }
749  for (auto it = prunedList.begin(); it != prunedList.end(); ++it) {
750  if ((*it)->name().length() <= 4 || (*it)->name().substr((*it)->name().length() - 4) != "Aux.") {
751  m_objects.push_back(*it); // then copy others new into previous
752  }
753  }
754 
755  // If there were any variable selections, record the information in SG.
756  if (!vetoes->empty()) {
757  ATH_CHECK( SG::makeHandle (m_selVetoesKey).record (std::move (vetoes)) );
758  }
759 
760  // Store the lossy float compression information in the SG.
761  if (!compInfo->empty()) {
762  ATH_CHECK( SG::makeHandle (m_compInfoKey).record (std::move (compInfo)) );
763  }
764 
765  return StatusCode::SUCCESS;
766 }

◆ compressionListHandlerHigh()

void AthenaOutputStream::compressionListHandlerHigh ( Gaudi::Details::PropertyBase &  )
protected

Handler for ItemNames Property.

Definition at line 1148 of file AthenaOutputStream.cxx.

1148  {
1149  IProperty *pAsIProp(nullptr);
1150  if ((m_compressionDecoderHigh.retrieve()).isFailure() ||
1151  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderHigh)) ||
1152  (pAsIProp->setProperty("ItemList", m_compressionListHigh.toString())).isFailure()) {
1153  throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
1154  }
1155 }

◆ compressionListHandlerLow()

void AthenaOutputStream::compressionListHandlerLow ( Gaudi::Details::PropertyBase &  )
protected

Handler for ItemNames Property.

Definition at line 1157 of file AthenaOutputStream.cxx.

1157  {
1158  IProperty *pAsIProp(nullptr);
1159  if ((m_compressionDecoderLow.retrieve()).isFailure() ||
1160  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderLow)) ||
1161  (pAsIProp->setProperty("ItemList", m_compressionListLow.toString())).isFailure()) {
1162  throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
1163  }
1164 }

◆ excludeListHandler()

void AthenaOutputStream::excludeListHandler ( Gaudi::Details::PropertyBase &  )
protected

Handler for ItemNames Property.

Definition at line 1139 of file AthenaOutputStream.cxx.

1139  {
1140  IProperty *pAsIProp(nullptr);
1141  if ((m_decoder.retrieve()).isFailure() ||
1142  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_decoder)) ||
1143  (pAsIProp->setProperty("ItemList", m_excludeList.toString())).isFailure()) {
1144  throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
1145  }
1146 }

◆ execute()

StatusCode AthenaOutputStream::execute ( )
overridevirtual

Reimplemented in AthenaConditionStream.

Definition at line 559 of file AthenaOutputStream.cxx.

559  {
560  bool failed = false;
561  for (ToolHandle<IAthenaOutputTool>& tool : m_helperTools) {
562  if (!tool->preExecute().isSuccess()) {
563  failed = true;
564  }
565  }
566  // Test whether this event should be output
567  if (m_writeOnExecute && isEventAccepted()) {
568  if (write().isFailure()) {
569  failed = true;
570  }
571  }
572  for (ToolHandle<IAthenaOutputTool>& tool : m_helperTools) {
573  if(!tool->postExecute().isSuccess()) {
574  failed = true;
575  }
576  }
578  writeMetaData();
580  // finalize will disconnect output
581  if( !finalize().isSuccess() ) {
582  failed = true;
583  }
584  }
585  if (failed) {
586  return(StatusCode::FAILURE);
587  }
588  return(StatusCode::SUCCESS);
589 }

◆ finalize()

StatusCode AthenaOutputStream::finalize ( )
overridevirtual

Reimplemented in AthenaConditionStream.

Definition at line 533 of file AthenaOutputStream.cxx.

534 {
535  bool failed = false;
536  ATH_MSG_DEBUG("finalize: Optimize output");
537  // Connect the output file to the service
538  if (!m_streamer->finalizeOutput().isSuccess()) {
539  failed = true;
540  }
541  ATH_MSG_DEBUG("finalize: end optimize output");
542  if (!m_helperTools.release().isSuccess()) {
543  failed = true;
544  }
545  if (!m_streamer.release().isSuccess()) {
546  failed = true;
547  }
548  if (failed) {
549  return(StatusCode::FAILURE);
550  }
551  m_objects.clear();
552  m_objects.shrink_to_fit();
553  m_ownedObjects.clear();
554  m_altObjects.clear();
555  return(StatusCode::SUCCESS);
556 }

◆ finalizeRange()

void AthenaOutputStream::finalizeRange ( const std::string &  rangeFN)
private

Definition at line 444 of file AthenaOutputStream.cxx.

445 {
446  ATH_MSG_DEBUG("Writing MetaData to " << rangeFN);
447  // MN: not calling StopMetaData Incident here but directly writeMetaData() - OK for Sim, check others
448  // metadata tools like CutFlowSvc are not able to handle this yet
449  const std::string rememberID = m_outSeqSvc->setRangeID( m_rangeIDforRangeFN[ rangeFN ] );
450  writeMetaData( rangeFN );
451  m_outSeqSvc->setRangeID( rememberID );
452 
453  ATH_MSG_INFO("Finished writing Event Sequence to " << rangeFN);
454  auto strm_iter = m_streamerMap.find( rangeFN );
455  strm_iter->second->finalizeOutput().ignore();
456  strm_iter->second->finalize().ignore();
457  m_streamerMap.erase( strm_iter );
458  m_outSeqSvc->publishRangeReport( rangeFN );
459 }

◆ handle()

void AthenaOutputStream::handle ( const Incident &  incident)
overridevirtual

Incident service handle listening for MetaDataStop.

Definition at line 368 of file AthenaOutputStream.cxx.

369 {
370  ATH_MSG_DEBUG("handle() incident type: " << inc.type());
371  // mutex shared with write() which is called from writeMetaData
372  std::unique_lock<mutex_t> lock(m_mutex);
373 
374  if( inc.type() == "MetaDataStop" ) {
375  if( m_outSeqSvc->inUse() ) {
376  if( m_outSeqSvc->inConcurrentEventsMode() ) {
377  // EventService MT - write metadata and close all remaining substreams
378  while( m_streamerMap.size() > 0 ) {
379  finalizeRange( m_streamerMap.begin()->first );
380  }
381  return;
382  }
383  if( m_outSeqSvc->lastIncident() == "EndEvent" ) {
384  // in r22 EndEvent comes before output writing
385  // - queue metadata writing and disconnect for after Event write
387  return;
388  }
389  }
390  // not in Event Service
391  writeMetaData();
392  }
393  else if( m_outSeqSvc->inUse() ) {
394  // Handle Event Ranges for Event Service
395  EventContext::ContextID_t slot = inc.context().slot();
396  if( slot == EventContext::INVALID_CONTEXT_ID ) {
397  throw GaudiException("Received Incident with invalid slot in ES mode", name(), StatusCode::FAILURE);
398  }
399  auto count_events_in_range = [&](const std::string& range) {
400  return std::count_if(m_slotRangeMap.cbegin(), m_slotRangeMap.cend(),
401  [&](auto& el){return el.second == range;} );
402  };
403  if( inc.type() == IncidentType::BeginProcessing ) {
404  // get the current/old range filename for this slot
405  const std::string rangeFN = m_slotRangeMap[ slot ];
406  // build the new range filename for this slot
407  const std::string newRangeFN = m_outSeqSvc->buildSequenceFileName( m_outputName );
408  if( !rangeFN.empty() and rangeFN != newRangeFN ) {
409  ATH_MSG_INFO("Slot range change: '" << rangeFN << "' -> '" << newRangeFN << "'");
410  ATH_MSG_DEBUG("There are " << m_slotRangeMap.size() << " slots in use");
411  for(auto range : m_slotRangeMap ) {
412  ATH_MSG_DEBUG("Slot: " << range.first << " FN=" << range.second);
413  }
414  if( count_events_in_range(rangeFN) == 1 ) {
415  finalizeRange( rangeFN );
416  }
417  }
418  ATH_MSG_INFO("slot " << slot << " processing event in range: " << newRangeFN);
419  m_slotRangeMap[ slot ] = newRangeFN;
420  // remember the RangeID for this slot so we can write metadata *after* a range change
421  m_rangeIDforRangeFN[ newRangeFN ] = m_outSeqSvc->currentRangeID();
422  }
423  else if( inc.type() == IncidentType::EndProcessing ) {
424  ATH_MSG_DEBUG("There are " << m_slotRangeMap.size() << " slots in use");
425  for(auto range : m_slotRangeMap ) {
426  ATH_MSG_DEBUG("Slot: " << range.first << " FN=" << range.second);
427  }
428  if( m_slotRangeMap.size() > 1 ) {
429  // if there are multiple slots, we can detect if the range ended with this event
430  // - except the last range, because there is no next range to clear the slot map
431  const std::string rangeFN = m_slotRangeMap[ slot ];
432  if( count_events_in_range(rangeFN) == 1 ) {
433  finalizeRange( rangeFN );
434  m_slotRangeMap[ slot ].clear();
435  }
436  }
437  }
438  }
439  ATH_MSG_DEBUG("Leaving incident handler for " << inc.type());
440 }

◆ handleVariableSelection()

void AthenaOutputStream::handleVariableSelection ( const SG::IConstAuxStore auxstore,
SG::DataProxy itemProxy,
const std::string &  tns,
const std::string &  aux_attr,
SG::SelectionVetoes vetoes 
) const
private

Definition at line 1077 of file AthenaOutputStream.cxx.

1082 {
1083  // collect dynamic Aux selection (parse the line, attributes separated by dot)
1084  std::set<std::string> attributes;
1085  if( aux_attr.size() ) {
1086  std::stringstream ss(aux_attr);
1087  std::string attr;
1088  while( std::getline(ss, attr, '.') ) {
1089  attributes.insert(attr);
1090  std::stringstream temp;
1091  temp << tns << attr;
1092  if (m_itemSvc->addStreamItem(this->name(),temp.str()).isFailure()) {
1093  ATH_MSG_WARNING("Unable to record item " << temp.str() << " in Svc");
1094  }
1095  }
1096  }
1097 
1098  // Return early if there's no selection.
1099  if (attributes.empty()) {
1100  return;
1101  }
1102 
1103  std::string key = itemProxy.name();
1104  if (key.size() >= 4 && key.compare (key.size()-4, 4, "Aux.")==0)
1105  {
1106  key.erase (key.size()-4, 4);
1107  }
1108 
1109  // Find the entry for the selection.
1110  SG::auxid_set_t& vset = vetoes[key];
1111 
1112  // Form the veto mask for this object.
1114  sel.selectAux (attributes);
1115 
1116  // Get all the AuxIDs that we know of and the selected ones
1117  SG::auxid_set_t all = auxstore.getAuxIDs();
1118  SG::auxid_set_t selected = sel.getSelectedAuxIDs( all );
1119 
1120  // Loop over all and build a list of vetoed AuxIDs from non selected ones
1121  for( const SG::auxid_t auxid : all ) {
1122  if ( !selected.test( auxid ) ) {
1123  vset.insert( auxid );
1124  }
1125  }
1126 }

◆ initialize()

StatusCode AthenaOutputStream::initialize ( )
overridevirtual

Definition at line 211 of file AthenaOutputStream.cxx.

211  {
213  ATH_MSG_DEBUG("In initialize");
214  // Reset the number of events written
215  m_events = 0;
216 
217  // set up the SG service:
218  ATH_CHECK( m_dataStore.retrieve() );
219  ATH_MSG_DEBUG("Found " << m_dataStore.typeAndName() << " store.");
220  assert(static_cast<bool>(m_dataStore));
221  if (!m_metadataItemList.value().empty()) {
222  ATH_CHECK( m_metadataStore.retrieve() );
223  ATH_MSG_DEBUG("Found " << m_metadataStore.typeAndName() << " store.");
224  assert(static_cast<bool>(m_metadataStore));
225  }
226 
227  // set up the CLID service:
228  ATH_CHECK( m_pCLIDSvc.retrieve() );
229 
230  ATH_CHECK( m_dictLoader.retrieve() );
231  ATH_CHECK( m_tpCnvSvc.retrieve() );
232 
233  // set up the ItemListSvc service:
234  assert(static_cast<bool>(m_pCLIDSvc));
235  ATH_CHECK( m_itemSvc.retrieve() );
236  assert(static_cast<bool>(m_itemSvc));
237 
238  // set up the OutputStreamSequencer service:
239  ATH_CHECK( m_outSeqSvc.retrieve() );
240  assert(static_cast<bool>(m_outSeqSvc));
241 
242  // Get Output Stream tool for writing
243  ATH_CHECK( m_streamer.retrieve() );
244  ATH_CHECK( m_streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord) );
245 
246  ATH_CHECK( m_helperTools.retrieve() );
247  ATH_MSG_INFO("Found " << m_helperTools << endmsg << "Data output: " << m_outputName);
248 
249  for (ToolHandle<IAthenaOutputTool>& tool : m_helperTools) {
250  ATH_CHECK( tool->postInitialize() );
251  }
252 
253  // Register this algorithm for 'I/O' events
254  ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
255  ATH_CHECK( iomgr.retrieve() );
256  ATH_CHECK( iomgr->io_register(this) );
257  ATH_CHECK( iomgr->io_register(this, IIoComponentMgr::IoMode::WRITE, m_outputName) );
258  ATH_CHECK( this->io_reinit() );
259 
260  // Add an explicit input dependency for everything in our item list
261  // that we know from the configuration is in the transient store.
262  // We don't want to add everything on the list, because configurations
263  // often initialize this with a maximal static list of everything
264  // that could possibly be written.
265  {
266  ATH_CHECK( m_transient.retrieve() );
267  IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
268  if (!pAsIProp) {
269  ATH_MSG_FATAL ("Bad folder interface");
270  return StatusCode::FAILURE;
271  }
272  ATH_CHECK (pAsIProp->setProperty("ItemList", m_transientItems.toString()));
273 
274  for (const SG::FolderItem& item : *m_p2BWritten) {
275  // Load ROOT dictionaries now.
276  loadDict (item.id());
277 
278  const std::string& k = item.key();
279  if (k.find('*') != std::string::npos) continue;
280  if (k.find('.') != std::string::npos) continue;
281  for (const SG::FolderItem& titem : *m_transient) {
282  if (titem.id() == item.id() && titem.key() == k) {
283  DataObjID id (item.id(), m_dataStore.name() + "+" + k);
284  this->addDependency (id, Gaudi::DataHandle::Reader);
285  break;
286  }
287  }
288  }
289  m_transient->clear();
290  }
291 
292  // Also load dictionaries for metadata classes.
293  if (!m_metadataItemList.value().empty()) {
294  IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
295  if (!pAsIProp) {
296  ATH_MSG_FATAL ("Bad folder interface");
297  return StatusCode::FAILURE;
298  }
299  ATH_CHECK (pAsIProp->setProperty("ItemList", m_metadataItemList.toString()));
300  for (const SG::FolderItem& item : *m_transient) {
301  loadDict (item.id());
302  }
303  m_transient->clear();
304  }
305 
306  // Also make sure we have the dictionary for Token.
307  m_dictLoader->load_type ("Token");
308 
309  // listen to event range incidents if incident name is configured
310  ATH_CHECK( m_incidentSvc.retrieve() );
311  if( !m_outSeqSvc->incidentName().empty() ) {
312  // use priority 95 to make sure the Output Sequencer goes first (it has priority 100)
313  m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 95);
314  m_incidentSvc->addListener(this, IncidentType::EndProcessing, 95);
315  }
316 
317  // Check compression settings and print some information about the configuration
318  // Both should be between [5, 23] and high compression should be < low compression
319  if(m_compressionBitsHigh < 5 || m_compressionBitsHigh > 23) {
320  ATH_MSG_INFO("Float compression mantissa bits for high compression " <<
321  "(" << m_compressionBitsHigh << ") is outside the allowed range of [5, 23].");
322  ATH_MSG_INFO("Setting it to the appropriate limit.");
324  }
325  if(m_compressionBitsLow < 5 || m_compressionBitsLow > 23) {
326  ATH_MSG_INFO("Float compression mantissa bits for low compression " <<
327  "(" << m_compressionBitsLow << ") is outside the allowed range of [5, 23].");
328  ATH_MSG_INFO("Setting it to the appropriate limit.");
330  }
332  ATH_MSG_ERROR("Float compression mantissa bits for low compression " <<
333  "(" << m_compressionBitsLow << ") is lower than or equal to high compression " <<
334  "(" << m_compressionBitsHigh << ")! Please check the configuration! ");
335  return StatusCode::FAILURE;
336  }
337  if(m_compressionListHigh.value().empty() && m_compressionListLow.value().empty()) {
338  ATH_MSG_VERBOSE("Both high and low float compression lists are empty. Float compression will NOT be applied.");
339  } else {
340  ATH_MSG_INFO("Either high or low (or both) float compression lists are defined. Float compression will be applied.");
341  ATH_MSG_INFO("High compression will use " << m_compressionBitsHigh << " mantissa bits, and " <<
342  "low compression will use " << m_compressionBitsLow << " mantissa bits.");
343  }
344 
345  // Setup stream name
346  if (m_streamName.empty()) {
347  m_streamName.setValue(this->name());
348  }
349 
350  // Set SG key for selected variable information.
351  m_selVetoesKey = "SelectionVetoes_" + m_streamName;
353 
354  m_compInfoKey = "CompressionInfo_" + m_streamName;
356 
357  ATH_MSG_DEBUG("End initialize");
358  return StatusCode::SUCCESS;
359 }

◆ io_finalize()

StatusCode AthenaOutputStream::io_finalize ( )
overridevirtual

Definition at line 1248 of file AthenaOutputStream.cxx.

1248  {
1249  ATH_MSG_INFO("I/O finalization...");
1250  for (ToolHandle<IAthenaOutputTool>& tool : m_helperTools) {
1251  if (!tool->preFinalize().isSuccess()) {
1252  ATH_MSG_ERROR("Cannot finalize helper tool");
1253  }
1254  }
1255  const Incident metaDataStopIncident(name(), "MetaDataStop");
1256  this->handle(metaDataStopIncident);
1257  m_incidentSvc->removeListener(this, "MetaDataStop");
1258  if (m_dataStore->clearStore().isFailure()) {
1259  ATH_MSG_WARNING("Cannot clear the DataStore");
1260  }
1261  return StatusCode::SUCCESS;
1262 }

◆ io_reinit()

StatusCode AthenaOutputStream::io_reinit ( )
overridevirtual

Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2))

Definition at line 1231 of file AthenaOutputStream.cxx.

1231  {
1232  ATH_MSG_INFO("I/O reinitialization...");
1233  // For 'write on finalize', we set up listener for 'MetaDataStop'
1234  // and perform write at this point. This happens at 'stop' of the
1235  // event selector. RDS 04/2010
1236  // Set to be listener for end of event
1237  m_incidentSvc->removeListener(this, "MetaDataStop"); // Remove any existing listener to avoid handling the incident multiple times
1238  m_incidentSvc->addListener(this, "MetaDataStop", 50);
1239  for (ToolHandle<IAthenaOutputTool>& tool : m_helperTools) {
1240  if (!tool->postInitialize().isSuccess()) {
1241  ATH_MSG_ERROR("Cannot initialize helper tool");
1242  }
1243  }
1244  return StatusCode::SUCCESS;
1245 }

◆ itemListHandler()

void AthenaOutputStream::itemListHandler ( Gaudi::Details::PropertyBase &  )
protected

Handler for ItemNames Property.

Definition at line 1129 of file AthenaOutputStream.cxx.

1129  {
1130  // Assuming concrete SG::Folder also has an itemList property
1131  IProperty *pAsIProp(nullptr);
1132  if ((m_p2BWritten.retrieve()).isFailure() ||
1133  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
1134  (pAsIProp->setProperty(m_itemList)).isFailure()) {
1135  throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
1136  }
1137 }

◆ loadDict()

void AthenaOutputStream::loadDict ( CLID  clid)
private

Helper function to load dictionaries (both transient and persistent) for a given type.

We want to to this explicitly during initialization to avoid sporadic failures seen loading dictionaries while multiple threads are running. See ATEAM-697 and ATEAM-749.

Definition at line 1270 of file AthenaOutputStream.cxx.

1271 {
1272  m_dictLoader->load_type (clid);
1273 
1274  // Also load the persistent class dictionary, if applicable.
1275  std::unique_ptr<ITPCnvBase> tpcnv = m_tpCnvSvc->t2p_cnv_unique (clid);
1276  if (tpcnv) {
1277  m_dictLoader->load_type (tpcnv->persistentTInfo());
1278  }
1279 }

◆ matchKey()

bool AthenaOutputStream::matchKey ( const std::vector< std::string > &  key,
const std::string &  proxyName 
) const
private

Try to match a DataProxy name to a vector of strings.

Definition at line 1188 of file AthenaOutputStream.cxx.

1189  {
1190  bool keyMatch = true; // default return
1191 
1192  // Get an iterator to the first (not zeroth!) string in the vector
1193  std::vector<std::string>::const_iterator itrEnd = key.cend();
1194  std::vector<std::string>::const_iterator itr = key.cbegin();
1195 
1196  // Walk through the whole proxyName string and try to match to all sub-keys
1197  // We are using that: std::string::npos!=string.find("") is always true
1198  std::string::size_type proxyNamePos=0;
1199  while ( itr != itrEnd &&
1200  std::string::npos != ( proxyNamePos = proxyName.find(*itr, proxyNamePos) )
1201  ) {
1202  // If we are at the begin iterator and the first element is Not an empty string
1203  ATH_MSG_VERBOSE("If we are at the begin iterator and the first element is Not an empty string");
1204  if ( !(key.front().empty()) && itr == key.cbegin() && proxyNamePos != 0 ) {
1205  // We had to match a precise name at the beginning, but didn't find it at the beginning
1206  ATH_MSG_VERBOSE("We had to match a precise name at the beginning, but didn't find it at the beginning");
1207  break;
1208  }
1209  // If we are at the end iterator and the last element is Not an empty string
1210  if ( !(key.back().empty()) && itr == --(key.cend()) && (proxyNamePos+itr->size()!=proxyName.size()) ) {
1211  // We had to match a precise name at the end, but didn't find it at the end
1212  ATH_MSG_VERBOSE("We had to match a precise name at the end, but didn't find it at the end");
1213  break;
1214  }
1215  ATH_MSG_VERBOSE("Found a match of subkey: " << *itr << " in string: " << proxyName
1216  << " at position: " << proxyNamePos );
1217  // If we have a good match, increment the iterator and the search position
1218  proxyNamePos += itr->size();
1219  ++itr;
1220  }
1221  // Didn't find everything
1222  if ( itr != itrEnd ) {
1223  keyMatch = false;
1224  ATH_MSG_VERBOSE("Couldn't match every sub-string... return: " << keyMatch);
1225  }
1226  else { ATH_MSG_VERBOSE("Did match every sub-string... return: " << keyMatch); }
1227 
1228  return(keyMatch);
1229 }

◆ selectedObjects()

IDataSelector* AthenaOutputStream::selectedObjects ( )
inline

Return the list of selected objects.

Definition at line 209 of file AthenaOutputStream.h.

209  {
210  return &m_objects;
211  }

◆ stop()

StatusCode AthenaOutputStream::stop ( )
overridevirtual

Definition at line 361 of file AthenaOutputStream.cxx.

362 {
363  ATH_MSG_DEBUG("AthenaOutputStream " << this->name() << " ::stop()");
364  return StatusCode::SUCCESS;
365 }

◆ tokenizeAtSep()

void AthenaOutputStream::tokenizeAtSep ( std::vector< std::string > &  subStrings,
const std::string &  portia,
const std::string &  sepstr 
) const
private

tokenize a string based on a substring

Definition at line 1166 of file AthenaOutputStream.cxx.

1168  {
1169  subStrings.clear(); // clear from previous iteration step
1170  // If the portia starts with a wildcard, add an empty string
1171  if (portia.starts_with( sepstr )) {
1172  subStrings.push_back("");
1173  }
1174  boost::char_separator<char> csep(sepstr.c_str());
1175  boost::tokenizer<char_separator<char>> tokens(portia, csep);
1176  for (const std::string& t : tokens) {
1177  //ATH_MSG_VERBOSE("Now on token: " << t);
1178  subStrings.push_back(t);
1179  }
1180  // If the portia ends with a wildcard, add an empty string
1181  if ( portia.size() >= sepstr.size() &&
1182  portia.compare( portia.size() - sepstr.size(), sepstr.size(), sepstr) == 0 ) {
1183  subStrings.push_back("");
1184  }
1185  return;
1186 }

◆ write()

StatusCode AthenaOutputStream::write ( )
virtual

Stream the data.

Definition at line 592 of file AthenaOutputStream.cxx.

592  {
593  bool failed = false;
594  IAthenaOutputStreamTool* streamer = &*m_streamer;
595  std::string outputFN;
596 
597  std::unique_lock<mutex_t> lock(m_mutex);
598  outputFN = m_outSeqSvc->buildSequenceFileName( m_outputName );
599 
600  // Handle Event Ranges
601  if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() ) {
602  ATH_MSG_DEBUG( "Writing event sequence to " << outputFN );
603  streamer = m_streamerMap[ outputFN ].get();
604  if( !streamer ) {
605  // new range, needs a new streamer tool
606  IAlgTool* st = AlgTool::Factory::create( m_streamer->type(), m_streamer->type(), m_streamer->name(), this ).release();
607  st->addRef();
608  streamer = dynamic_cast<IAthenaOutputStreamTool*>( st );
609  IProperty *mstreamer_props = dynamic_cast<IProperty*> (&*m_streamer);
610  IProperty *streamer_props = dynamic_cast<IProperty*> (&*streamer);
611  for ( const auto& prop : mstreamer_props->getProperties() ) {
612  ATH_CHECK( streamer_props->setProperty( *prop ) );
613  }
614  if( !streamer or streamer->initialize().isFailure()
615  or streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord).isFailure() ) {
616  ATH_MSG_FATAL("Unable to initialize OutputStreamTool for " << outputFN );
617  return StatusCode::FAILURE;
618  }
619  m_streamerMap[ outputFN ].reset( streamer );
620  }
621  }
622 
623  // Clear any previously existing item list
624  clearSelection();
626 
627  // keep a local copy of the object lists so they are not overwritten when we release the lock
628  IDataSelector objects = std::move( m_objects );
629  IDataSelector altObjects = std::move( m_altObjects );
630  std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move( m_ownedObjects );
631 
632  // print out info about objects collected
633  if (m_checkNumberOfWrites) {
634  bool checkCountError = false;
635  ATH_MSG_DEBUG(" Collected objects:");
636  bool first = true;
637  unsigned int lastCount = 0;
639  clast = m_objectWriteCounter.end(); cit != clast; ++cit) {
640  bool isError = false;
641  if (first) {
642  lastCount = (*cit).second;
643  first = false;
644  } else if (lastCount != (*cit).second) {
645  isError = true;
646  //Complain, but don't abort
647  checkCountError = true;
648  }
649  if (isError) {
650  ATH_MSG_ERROR(" INCORRECT Object/count: "
651  << (*cit).first << ", " << (*cit).second << " should be: " << lastCount);
652  } else {
653  ATH_MSG_DEBUG(" Object/count: " << (*cit).first << ", " << (*cit).second);
654  }
655  }
656  if (checkCountError) {
657  ATH_MSG_FATAL("Check number of writes failed. See messages above "
658  "to identify which container is not always written");
659  return(StatusCode::FAILURE);
660  }
661  }
662  // prepare before releasing lock because m_outputAttributes change in metadataStop
663  const std::string connectStr = outputFN + m_outputAttributes;
664 
665  for (ToolHandle<IAthenaOutputTool>& tool : m_helperTools) {
666  ATH_CHECK( tool->preStream() );
667  }
668 
669  // MN: would be nice to release the Stream lock here
670  // lock.unlock();
671 
672  // Connect the output file to the service
673  if (!streamer->connectOutput(connectStr).isSuccess()) {
674  ATH_MSG_FATAL("Could not connectOutput");
675  return StatusCode::FAILURE;
676  }
677  ATH_MSG_DEBUG("connectOutput done for " + outputFN);
678  StatusCode currentStatus = streamer->streamObjects(objects, connectStr);
679  // Do final check of streaming
680  if (!currentStatus.isSuccess()) {
681  if (!currentStatus.isRecoverable()) {
682  ATH_MSG_FATAL("streamObjects failed.");
683  failed = true;
684  } else {
685  ATH_MSG_DEBUG("streamObjects failed.");
686  }
687  }
688  bool doCommit = false;
689  if (!streamer->commitOutput(doCommit).isSuccess()) {
690  ATH_MSG_FATAL("commitOutput failed.");
691  failed = true;
692  }
693  if (failed) {
694  return(StatusCode::FAILURE);
695  }
696  m_events++;
697  return(StatusCode::SUCCESS);
698 }

◆ writeMetaData()

void AthenaOutputStream::writeMetaData ( const std::string &  outputFN = "")
private

Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)

Definition at line 465 of file AthenaOutputStream.cxx.

466 {
467  // use main stream tool by default, or per outputFile in ES mode
468  IAthenaOutputStreamTool* streamer = outputFN.empty()? &*m_streamer : m_streamerMap[outputFN].get();
469 
470  for (ToolHandle<IAthenaOutputTool>& tool : m_helperTools) {
471  if (!tool->preFinalize().isSuccess()) {
472  throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE);
473  }
474  }
475  if( m_metaDataSvc->prepareOutput(outputFN).isFailure() ) {
476  throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
477  }
478  // lock all metadata to prevent updates during writing
480 
481  // Always force a final commit in stop - mainly applies to AthenaPool
482  if (m_writeOnFinalize) {
483  if (write().isFailure()) { // true mean write AND commit
484  throw GaudiException("Cannot write on finalize", name(), StatusCode::FAILURE);
485  }
486  ATH_MSG_INFO("Records written: " << m_events);
487  }
488  // Prepare the WriteDataHeaderForms incident
489  std::string DHFWriteIncidentfileName = m_outSeqSvc->buildSequenceFileName(m_outputName);
490  // remove technology from the name
491  size_t pos = DHFWriteIncidentfileName.find(':');
492  if( pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(pos+1);
493  FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName);
494  m_incidentSvc->fireIncident(incident);
495 
496  ATH_MSG_DEBUG("metadataItemList: " << m_metadataItemList.value() );
497  if (!m_metadataItemList.value().empty()) {
499  StatusCode status = streamer->connectServices(m_metadataStore.typeAndName(), m_persName, false);
500  if (status.isFailure()) {
501  throw GaudiException("Unable to connect metadata services", name(), StatusCode::FAILURE);
502  }
503  m_checkNumberOfWrites = false;
504  m_outputAttributes = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
505  m_p2BWritten->clear();
506  IProperty *pAsIProp(nullptr);
507  if ((m_p2BWritten.retrieve()).isFailure() ||
508  nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
509  (pAsIProp->setProperty("ItemList", m_metadataItemList.toString())).isFailure()) {
510  throw GaudiException("Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
511  }
512  if (write().isFailure()) {
513  throw GaudiException("Cannot write metadata", name(), StatusCode::FAILURE);
514  }
515  FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName + m_outputAttributes);
516  m_incidentSvc->fireIncident(incident);
517 
518  m_outputAttributes.clear();
521  if (status.isFailure()) {
522  throw GaudiException("Unable to re-connect services", name(), StatusCode::FAILURE);
523  }
524  m_p2BWritten->clear();
525  if ((pAsIProp->setProperty(m_itemList)).isFailure()) {
526  throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
527  }
528  ATH_MSG_INFO("Metadata records written: " << m_events);
529  }
530 }

Member Data Documentation

◆ m_altObjects

IDataSelector AthenaOutputStream::m_altObjects
protected

Objects overridden by ‘exact’ handling.

Definition at line 121 of file AthenaOutputStream.h.

◆ m_checkNumberOfWrites

bool AthenaOutputStream::m_checkNumberOfWrites
protected

set to true to check for number of times each object is written

Definition at line 140 of file AthenaOutputStream.h.

◆ m_CLIDKeyPairs

std::multimap<CLID,std::string> AthenaOutputStream::m_CLIDKeyPairs
protected

map of (clid,key) pairs to be excluded (comes from m_excludeList)

Definition at line 117 of file AthenaOutputStream.h.

◆ m_compInfoKey

SG::WriteHandleKey<SG::CompressionInfo> AthenaOutputStream::m_compInfoKey { this, "CompInfoKey", "" }
private

Key used for recording lossy float compressed variable information to the event store.

Definition at line 174 of file AthenaOutputStream.h.

◆ m_compressionBitsHigh

unsigned int AthenaOutputStream::m_compressionBitsHigh
protected

Number of mantissa bits in the float compression.

Definition at line 100 of file AthenaOutputStream.h.

◆ m_compressionBitsLow

unsigned int AthenaOutputStream::m_compressionBitsLow
protected

Number of mantissa bits in the float compression.

Definition at line 102 of file AthenaOutputStream.h.

◆ m_compressionDecoderHigh

ToolHandle<SG::IFolder> AthenaOutputStream::m_compressionDecoderHigh
protected

the top-level folder with items to be compressed high

Definition at line 111 of file AthenaOutputStream.h.

◆ m_compressionDecoderLow

ToolHandle<SG::IFolder> AthenaOutputStream::m_compressionDecoderLow
protected

the top-level folder with items to be compressed low

Definition at line 113 of file AthenaOutputStream.h.

◆ m_compressionListHigh

StringArrayProperty AthenaOutputStream::m_compressionListHigh
protected

Vector of item names.

Definition at line 96 of file AthenaOutputStream.h.

◆ m_compressionListLow

StringArrayProperty AthenaOutputStream::m_compressionListLow
protected

Vector of item names.

Definition at line 98 of file AthenaOutputStream.h.

◆ m_currentStore

ServiceHandle<StoreGateSvc>* AthenaOutputStream::m_currentStore
protected

Definition at line 65 of file AthenaOutputStream.h.

◆ m_dataStore

ServiceHandle<StoreGateSvc> AthenaOutputStream::m_dataStore
protected

handle to the StoreGateSvc store where the data we want to write out resides

Definition at line 63 of file AthenaOutputStream.h.

◆ m_decoder

ToolHandle<SG::IFolder> AthenaOutputStream::m_decoder
protected

the top-level folder with items to be written

Definition at line 109 of file AthenaOutputStream.h.

◆ m_dictLoader

ServiceHandle<IDictLoaderSvc> AthenaOutputStream::m_dictLoader
protected

Definition at line 70 of file AthenaOutputStream.h.

◆ m_events

std::atomic<int> AthenaOutputStream::m_events
protected

Number of events written to this output stream.

Definition at line 127 of file AthenaOutputStream.h.

◆ m_excludeList

StringArrayProperty AthenaOutputStream::m_excludeList {this,"ExcludeList",{},"List of metadata items to write","OrderedSet<std::string>"}
protected

Vector of item names.

Definition at line 94 of file AthenaOutputStream.h.

◆ m_extendProvenanceRecord

bool AthenaOutputStream::m_extendProvenanceRecord
protected

set to false to omit adding the current DataHeader into the DataHeader history this will cause the input file to be neglected for back navigation (replace mode).

Definition at line 132 of file AthenaOutputStream.h.

◆ m_forceRead

bool AthenaOutputStream::m_forceRead
protected

set to true to force read of data objects in item list

Definition at line 129 of file AthenaOutputStream.h.

◆ m_helperTools

ToolHandleArray<IAthenaOutputTool> AthenaOutputStream::m_helperTools
protected

vector of AlgTools that that are executed by this stream

Definition at line 148 of file AthenaOutputStream.h.

◆ m_incidentSvc

ServiceHandle<IIncidentSvc> AthenaOutputStream::m_incidentSvc
protected

Definition at line 73 of file AthenaOutputStream.h.

◆ m_itemList

StringArrayProperty AthenaOutputStream::m_itemList {this,"ItemList",{},"List of items to write","OutputStreamItemList"}
protected

Vector of item names.

Definition at line 90 of file AthenaOutputStream.h.

◆ m_itemListFromTool

bool AthenaOutputStream::m_itemListFromTool
protected

set to write out everything in input DataHeader

Definition at line 138 of file AthenaOutputStream.h.

◆ m_itemSvc

ServiceHandle<IItemListSvc> AthenaOutputStream::m_itemSvc
protected

Definition at line 67 of file AthenaOutputStream.h.

◆ m_metadataItemList

StringArrayProperty AthenaOutputStream::m_metadataItemList {this,"MetadataItemList",{},"List of metadata items to write","OutputStreamItemList"}
protected

Vector of item names.

Definition at line 92 of file AthenaOutputStream.h.

◆ m_metadataStore

ServiceHandle<StoreGateSvc> AthenaOutputStream::m_metadataStore
protected

Definition at line 64 of file AthenaOutputStream.h.

◆ m_metaDataSvc

ServiceHandle<MetaDataSvc> AthenaOutputStream::m_metaDataSvc
protected

Definition at line 68 of file AthenaOutputStream.h.

◆ m_mutex

mutex_t AthenaOutputStream::m_mutex
protected

Definition at line 164 of file AthenaOutputStream.h.

◆ m_objects

IDataSelector AthenaOutputStream::m_objects
protected

Collection of objects being selected.

Definition at line 119 of file AthenaOutputStream.h.

◆ m_objectWriteCounter

CounterMapType AthenaOutputStream::m_objectWriteCounter
protected

Definition at line 143 of file AthenaOutputStream.h.

◆ m_outputAttributes

std::string AthenaOutputStream::m_outputAttributes
protected

Definition at line 81 of file AthenaOutputStream.h.

◆ m_outputName

std::string AthenaOutputStream::m_outputName
protected

Name of the output file.

Definition at line 80 of file AthenaOutputStream.h.

◆ m_outSeqSvc

ServiceHandle<OutputStreamSequencerSvc> AthenaOutputStream::m_outSeqSvc
protected

Definition at line 85 of file AthenaOutputStream.h.

◆ m_ownedObjects

std::vector<std::unique_ptr<DataObject> > AthenaOutputStream::m_ownedObjects
protected

Collection of DataObject instances owned by this service.

FIXME: it would be simpler to just have m_objects be a vector of DataObjectSharedPtr<DataObject>, but that implies interface changes.

Definition at line 125 of file AthenaOutputStream.h.

◆ m_p2BWritten

ToolHandle<SG::IFolder> AthenaOutputStream::m_p2BWritten
protected

the top-level folder with items to be written

Definition at line 107 of file AthenaOutputStream.h.

◆ m_pCLIDSvc

IClassIDSvc_t AthenaOutputStream::m_pCLIDSvc
protected

Definition at line 84 of file AthenaOutputStream.h.

◆ m_persName

std::string AthenaOutputStream::m_persName
protected

Name of the persistency service capable to write data from the store.

Definition at line 76 of file AthenaOutputStream.h.

◆ m_rangeIDforRangeFN

std::map< std::string, std::string > AthenaOutputStream::m_rangeIDforRangeFN
protected

map of RangeIDs (as used by the Sequencer) for each Range filename generated

Definition at line 158 of file AthenaOutputStream.h.

◆ m_selVetoesKey

SG::WriteHandleKey<SG::SelectionVetoes> AthenaOutputStream::m_selVetoesKey { this, "SelVetoesKey", "" }
private

Key used for recording selected dynamic variable information to the event store.

Definition at line 169 of file AthenaOutputStream.h.

◆ m_slotRangeMap

std::map< unsigned, std::string > AthenaOutputStream::m_slotRangeMap
protected

map of filenames assigned to active slots

Definition at line 155 of file AthenaOutputStream.h.

◆ m_streamer

ToolHandle<IAthenaOutputStreamTool> AthenaOutputStream::m_streamer
protected

pointer to AthenaOutputStreamTool

Definition at line 146 of file AthenaOutputStream.h.

◆ m_streamerMap

std::map< std::string, std::unique_ptr<IAthenaOutputStreamTool> > AthenaOutputStream::m_streamerMap
protected

map of streamerTools handling event ranges in MT

Definition at line 161 of file AthenaOutputStream.h.

◆ m_streamName

StringProperty AthenaOutputStream::m_streamName {this, "StreamName", "", "name of the output stream"}
protected

Stream name (defaults to algorithm name)

Definition at line 88 of file AthenaOutputStream.h.

◆ m_tpCnvSvc

ServiceHandle<ITPCnvSvc> AthenaOutputStream::m_tpCnvSvc
protected

Definition at line 71 of file AthenaOutputStream.h.

◆ m_transient

ToolHandle<SG::IFolder> AthenaOutputStream::m_transient
protected

Decoded list of transient ids.

Definition at line 115 of file AthenaOutputStream.h.

◆ m_transientItems

StringArrayProperty AthenaOutputStream::m_transientItems
protected

List of items that are known to be present in the transient store (and hence we can make input dependencies on them).

Definition at line 105 of file AthenaOutputStream.h.

◆ m_writeMetadataAndDisconnect

bool AthenaOutputStream::m_writeMetadataAndDisconnect = false
protected

Definition at line 151 of file AthenaOutputStream.h.

◆ m_writeOnExecute

bool AthenaOutputStream::m_writeOnExecute
protected

set to true to trigger streaming of data on execute()

Definition at line 134 of file AthenaOutputStream.h.

◆ m_writeOnFinalize

bool AthenaOutputStream::m_writeOnFinalize
protected

set to true to trigger streaming of data on finalize()

Definition at line 136 of file AthenaOutputStream.h.

◆ m_writingTool

StringProperty AthenaOutputStream::m_writingTool
protected

Name of the OutputStreamTool used for writing.

Definition at line 78 of file AthenaOutputStream.h.


The documentation for this class was generated from the following files:
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
AthenaOutputStream::finalize
virtual StatusCode finalize() override
Definition: AthenaOutputStream.cxx:533
AthenaOutputStream::m_rangeIDforRangeFN
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated
Definition: AthenaOutputStream.h:158
AthenaOutputStream::writeMetaData
void writeMetaData(const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)
Definition: AthenaOutputStream.cxx:465
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
AthenaOutputStream::m_outputAttributes
std::string m_outputAttributes
Definition: AthenaOutputStream.h:81
get_generator_info.result
result
Definition: get_generator_info.py:21
RootAuxDynIO::AUX_POSTFIX
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
Definition: RootAuxDynDefs.h:12
PowhegControl_ttHplus_NLO.ss
ss
Definition: PowhegControl_ttHplus_NLO.py:83
LArConditions2Ntuple.objects
objects
Definition: LArConditions2Ntuple.py:59
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
find
std::string find(const std::string &s)
return a remapped string
Definition: hcg.cxx:135
AthenaOutputStream::addItemObjects
StatusCode addItemObjects(const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
Definition: AthenaOutputStream.cxx:769
AthenaOutputStream::m_compressionBitsHigh
unsigned int m_compressionBitsHigh
Number of mantissa bits in the float compression.
Definition: AthenaOutputStream.h:100
DataBucketBase
A non-templated base class for DataBucket, allows to access the transient object address as a void*.
Definition: DataBucketBase.h:24
SG::isTransientKey
bool isTransientKey(const std::string &key)
Test to see if a key is transoent.
Definition: transientKey.h:41
AthenaOutputStream::tokenizeAtSep
void tokenizeAtSep(std::vector< std::string > &, const std::string &, const std::string &) const
tokenize a string based on a substring
Definition: AthenaOutputStream.cxx:1166
SG::fromStorable
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
Definition: StorableConversions.h:167
AthenaOutputStream::m_decoder
ToolHandle< SG::IFolder > m_decoder
the top-level folder with items to be written
Definition: AthenaOutputStream.h:109
AthenaOutputStream::m_itemList
StringArrayProperty m_itemList
Vector of item names.
Definition: AthenaOutputStream.h:90
AthenaOutputStream::buildCompressionSet
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
Definition: AthenaOutputStream.cxx:1034
AthenaOutputStream::m_dataStore
ServiceHandle< StoreGateSvc > m_dataStore
handle to the StoreGateSvc store where the data we want to write out resides
Definition: AthenaOutputStream.h:63
AthenaOutputStream::m_pCLIDSvc
IClassIDSvc_t m_pCLIDSvc
Definition: AthenaOutputStream.h:84
AthenaOutputStream::m_compressionListLow
StringArrayProperty m_compressionListLow
Vector of item names.
Definition: AthenaOutputStream.h:98
DataBucketBase::object
virtual void * object()=0
AthenaOutputStream::m_persName
std::string m_persName
Name of the persistency service capable to write data from the store.
Definition: AthenaOutputStream.h:76
IAthenaOutputStreamTool
This is a tool that allows streaming out of DataObjects. This has been factorized out from AthenaOutp...
Definition: IAthenaOutputStreamTool.h:69
AthenaOutputStream::matchKey
bool matchKey(const std::vector< std::string > &key, const std::string &proxyName) const
Try to match a DataProxy name to a vector of strings.
Definition: AthenaOutputStream.cxx:1188
skel.it
it
Definition: skel.GENtoEVGEN.py:396
pool::WRITE
@ WRITE
Definition: Database/APR/StorageSvc/StorageSvc/pool.h:72
CLIDRegistry::CLIDToTypeinfo
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
Definition: CLIDRegistry.cxx:136
SG::IFolder::const_iterator
ItemList::const_iterator const_iterator
Definition: SGIFolder.h:34
read_hist_ntuple.t
t
Definition: read_hist_ntuple.py:5
AthenaOutputStream::loadDict
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
Definition: AthenaOutputStream.cxx:1270
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
beamspotman.tokens
tokens
Definition: beamspotman.py:1284
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
FilteredAlgorithm::initialize
virtual StatusCode initialize()
Definition: FilteredAlgorithm.cxx:46
DataBucketBase::cast
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
AthenaOutputStream::m_transientItems
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
Definition: AthenaOutputStream.h:105
AthenaOutputStream::m_objects
IDataSelector m_objects
Collection of objects being selected.
Definition: AthenaOutputStream.h:119
AthenaOutputStream::m_metadataItemList
StringArrayProperty m_metadataItemList
Vector of item names.
Definition: AthenaOutputStream.h:92
xAOD::AuxCompression
Definition: AuxCompression.h:20
AthenaOutputStream::m_currentStore
ServiceHandle< StoreGateSvc > * m_currentStore
Definition: AthenaOutputStream.h:65
AthenaPoolTestWrite.stream
string stream
Definition: AthenaPoolTestWrite.py:12
mergePhysValFiles.end
end
Definition: DataQuality/DataQualityUtils/scripts/mergePhysValFiles.py:93
SG::ProxyMap
std::map< std::string, DataProxy * > ProxyMap
Definition: ProxyMap.h:24
AthenaOutputStream::collectAllObjects
StatusCode collectAllObjects()
Collect data objects for output streamer list.
Definition: AthenaOutputStream.cxx:707
AthenaOutputStream::m_checkNumberOfWrites
bool m_checkNumberOfWrites
set to true to check for number of times each object is written
Definition: AthenaOutputStream.h:140
SG::makeHandle
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
Definition: ReadCondHandle.h:270
MetaContBase
Definition: MetaCont.h:24
AthenaOutputStream::m_compressionListHigh
StringArrayProperty m_compressionListHigh
Vector of item names.
Definition: AthenaOutputStream.h:96
AthenaOutputStream::m_forceRead
bool m_forceRead
set to true to force read of data objects in item list
Definition: AthenaOutputStream.h:129
AthenaOutputStream::m_writeOnExecute
bool m_writeOnExecute
set to true to trigger streaming of data on execute()
Definition: AthenaOutputStream.h:134
AthenaOutputStream::m_streamer
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
Definition: AthenaOutputStream.h:146
AthenaOutputStream::m_outSeqSvc
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
Definition: AthenaOutputStream.h:85
AthenaOutputStream::m_compressionBitsLow
unsigned int m_compressionBitsLow
Number of mantissa bits in the float compression.
Definition: AthenaOutputStream.h:102
SG::auxid_t
size_t auxid_t
Identifier for a particular aux data item.
Definition: AuxTypes.h:27
AthenaOutputStream::m_extendProvenanceRecord
bool m_extendProvenanceRecord
set to false to omit adding the current DataHeader into the DataHeader history this will cause the in...
Definition: AthenaOutputStream.h:132
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
MetaDataSvc::ToolLockGuard
Definition: MetaDataSvc.h:243
lumiFormat.i
int i
Definition: lumiFormat.py:85
AthenaOutputStream::m_itemSvc
ServiceHandle< IItemListSvc > m_itemSvc
Definition: AthenaOutputStream.h:67
CxxUtils::ConcurrentBitset::insert
ConcurrentBitset & insert(bit_t bit, bit_t new_nbits=0)
Set a bit to 1.
AthenaOutputStream::m_slotRangeMap
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
Definition: AthenaOutputStream.h:155
endmsg
#define endmsg
Definition: AnalysisConfig_Ntuple.cxx:63
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
sel
sel
Definition: SUSYToolsTester.cxx:97
AthenaOutputStream::compressionListHandlerLow
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:1157
ClassID_traits
Default, invalid implementation of ClassID_traits.
Definition: Control/AthenaKernel/AthenaKernel/ClassID_traits.h:40
calibdata.exception
exception
Definition: calibdata.py:496
IAthenaOutputStreamTool::connectOutput
virtual StatusCode connectOutput(const std::string &outputName="")=0
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
LHEF::Reader
Pythia8::Reader Reader
Definition: Prophecy4fMerger.cxx:11
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
plotIsoValidation.el
el
Definition: plotIsoValidation.py:197
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaOutputStream::m_itemListFromTool
bool m_itemListFromTool
set to write out everything in input DataHeader
Definition: AthenaOutputStream.h:138
IAthenaOutputStreamTool::connectServices
virtual StatusCode connectServices(const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord=false)=0
Specify which data store and conversion service to use and whether to extend provenence Only use if o...
AthenaOutputStream::m_helperTools
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
Definition: AthenaOutputStream.h:148
AthenaOutputStream::m_ownedObjects
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
Definition: AthenaOutputStream.h:125
SG::VarHandleKey::initialize
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
Definition: AthToolSupport/AsgDataHandles/Root/VarHandleKey.cxx:103
AthenaOutputStream::excludeListHandler
void excludeListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:1139
AthenaOutputStream::compressionListHandlerHigh
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:1148
AthenaOutputStream::handleVariableSelection
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &tns, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Definition: AthenaOutputStream.cxx:1077
CLID
uint32_t CLID
The Class ID type.
Definition: Event/xAOD/xAODCore/xAODCore/ClassID_traits.h:47
AthenaOutputStream::m_writeMetadataAndDisconnect
bool m_writeMetadataAndDisconnect
Definition: AthenaOutputStream.h:151
AthenaOutputStream::m_transient
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
Definition: AthenaOutputStream.h:115
AthenaOutputStream::finalizeRange
void finalizeRange(const std::string &rangeFN)
Definition: AthenaOutputStream.cxx:444
AthenaOutputStream::m_incidentSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
Definition: AthenaOutputStream.h:73
AthenaOutputStream::m_outputName
std::string m_outputName
Name of the output file.
Definition: AthenaOutputStream.h:80
AthenaOutputStream::itemListHandler
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Definition: AthenaOutputStream.cxx:1129
xAOD::AuxSelection
Class helping in dealing with dynamic branch selection.
Definition: AuxSelection.h:31
id
SG::auxid_t id
Definition: Control/AthContainers/Root/debug.cxx:227
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
AthenaOutputStream::m_metadataStore
ServiceHandle< StoreGateSvc > m_metadataStore
Definition: AthenaOutputStream.h:64
AtlCoolConsole.tool
tool
Definition: AtlCoolConsole.py:453
AthenaOutputStream::io_reinit
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
Definition: AthenaOutputStream.cxx:1231
item
Definition: ItemListSvc.h:43
AthenaOutputStream::m_tpCnvSvc
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
Definition: AthenaOutputStream.h:71
SG::DataProxy::name
virtual const name_type & name() const override final
Retrieve data object key == string.
AthenaOutputStream::m_streamName
StringProperty m_streamName
Stream name (defaults to algorithm name)
Definition: AthenaOutputStream.h:88
Cut::all
@ all
Definition: SUSYToolsAlg.cxx:67
AthenaOutputStream::m_altObjects
IDataSelector m_altObjects
Objects overridden by ‘exact’ handling.
Definition: AthenaOutputStream.h:121
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:18
IAthenaOutputStreamTool::commitOutput
virtual StatusCode commitOutput(bool doCommit=false)=0
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
AthenaOutputStream::m_dictLoader
ServiceHandle< IDictLoaderSvc > m_dictLoader
Definition: AthenaOutputStream.h:70
AthenaOutputStream::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Definition: AthenaOutputStream.h:68
AthenaOutputStream::m_events
std::atomic< int > m_events
Number of events written to this output stream.
Definition: AthenaOutputStream.h:127
AthenaOutputStream::m_compressionDecoderHigh
ToolHandle< SG::IFolder > m_compressionDecoderHigh
the top-level folder with items to be compressed high
Definition: AthenaOutputStream.h:111
AthenaOutputStream::m_mutex
mutex_t m_mutex
Definition: AthenaOutputStream.h:164
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
ITPCnvBase::persistentTInfo
virtual const std::type_info & persistentTInfo() const =0
return C++ type id of the persistent class this converter is for
DeMoScan.first
bool first
Definition: DeMoScan.py:536
AthenaOutputStream::m_streamerMap
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT
Definition: AthenaOutputStream.h:161
AthenaOutputStream::m_objectWriteCounter
CounterMapType m_objectWriteCounter
Definition: AthenaOutputStream.h:143
collListGuids.attributes
attributes
Definition: collListGuids.py:46
SG::auxid_set_t
A set of aux data identifiers.
Definition: AuxTypes.h:47
merge.status
status
Definition: merge.py:17
StoreID::EVENT_STORE
@ EVENT_STORE
Definition: StoreID.h:26
AthenaOutputStream::write
virtual StatusCode write()
Stream the data.
Definition: AthenaOutputStream.cxx:592
AthenaOutputStream::m_selVetoesKey
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
Definition: AthenaOutputStream.h:170
SG::IConstAuxStore
Interface for const operations on an auxiliary store.
Definition: IConstAuxStore.h:64
AthenaOutputStream::m_writeOnFinalize
bool m_writeOnFinalize
set to true to trigger streaming of data on finalize()
Definition: AthenaOutputStream.h:136
GetLBsToIgnore.outputFN
outputFN
Definition: GetLBsToIgnore.py:225
python.PyAthena.obj
obj
Definition: PyAthena.py:132
MetaContBase::getAsVoid
virtual void * getAsVoid(const SourceID &sid) const =0
IAthenaOutputStreamTool::streamObjects
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="")=0
SG::DataProxy
Definition: DataProxy.h:45
SG::FolderItem
a Folder item (data object) is identified by the clid/key pair
Definition: SGFolderItem.h:24
AthenaOutputStream::m_excludeList
StringArrayProperty m_excludeList
Vector of item names.
Definition: AthenaOutputStream.h:94
SG::IConstAuxStore::getAuxIDs
virtual const SG::auxid_set_t & getAuxIDs() const =0
Return a set of identifiers for existing data items in this store.
SG::ConstProxyIterator
ProxyMap::const_iterator ConstProxyIterator
Definition: ProxyMap.h:28
match
bool match(std::string s1, std::string s2)
match the individual directories of two strings
Definition: hcg.cxx:356
AthenaOutputStream::handle
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.
Definition: AthenaOutputStream.cxx:368
AthenaOutputStream::m_p2BWritten
ToolHandle< SG::IFolder > m_p2BWritten
the top-level folder with items to be written
Definition: AthenaOutputStream.h:107
fitman.k
k
Definition: fitman.py:528
AthenaOutputStream::clearSelection
void clearSelection()
Clear list of selected objects.
Definition: AthenaOutputStream.cxx:701
python.BeamSpotUpdate.compression
compression
Definition: BeamSpotUpdate.py:188
AthenaOutputStream::m_compressionDecoderLow
ToolHandle< SG::IFolder > m_compressionDecoderLow
the top-level folder with items to be compressed low
Definition: AthenaOutputStream.h:113
ServiceHandle< IIoComponentMgr >
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37
CxxUtils::ConcurrentBitset::test
bool test(bit_t bit) const
Test to see if a bit is set.
AthenaOutputStream::m_compInfoKey
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.
Definition: AthenaOutputStream.h:175