ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaOutputStream Class Reference

algorithm that marks for write data objects in SG More...

#include <AthenaOutputStream.h>

Inheritance diagram for AthenaOutputStream:

Public Types

typedef std::vector< SG::DataProxy * > Items
 typedefs
typedef std::vector< std::pair< std::string, std::string > > TypeKeyPairs
typedef std::recursive_mutex mutex_t

Public Member Functions

 AthenaOutputStream (const std::string &name, ISvcLocator *pSvcLocator)
 Standard algorithm Constructor.
virtual ~AthenaOutputStream ()
 Standard Destructor.

implement IAlgorithm

ServiceHandle< StoreGateSvcm_dataStore {this, "Store", "StoreGateSvc/StoreGateSvc", "Handle to event store"}
 Handle to the StoreGateSvc store where the data we want to write out resides.
ServiceHandle< StoreGateSvcm_metadataStore {this, "MetadataStore", "StoreGateSvc/MetaDataStore", "Handle to metadata store"}
ServiceHandle< StoreGateSvc > * m_currentStore
ServiceHandle< MetaDataSvcm_metaDataSvc {this, "MetaDataSvc", "MetaDataSvc"}
 Handles to all the necessary services.
ServiceHandle< IDictLoaderSvcm_dictLoader {this, "AthDictLoaderSvc", "AthDictLoaderSvc"}
ServiceHandle< ITPCnvSvcm_tpCnvSvc {this, "AthTPCnvSvc", "AthTPCnvSvc"}
ServiceHandle< IIncidentSvc > m_incidentSvc {this, "IncidentSvc", "IncidentSvc"}
ServiceHandle< IClassIDSvc > m_pCLIDSvc {this, "ClassIDSvc", "ClassIDSvc"}
ServiceHandle< OutputStreamSequencerSvcm_outSeqSvc {this, "OutputStreamSequencerSvc", "OutputStreamSequencerSvc"}
StringProperty m_streamName {this, "StreamName", "", "Name of the output stream"}
 Stream name (defaults to algorithm name)
StringArrayProperty m_itemList {this, "ItemList", {}, "List of items to write", "OutputStreamItemList"}
 Vector of item names.
StringArrayProperty m_metadataItemList {this, "MetadataItemList", {}, "List of metadata items to write","OutputStreamItemList"}
 Vector of item names.
StringProperty m_keepProvenances
 Provenance record selection.
StringArrayProperty m_compressionListHigh {this, "CompressionListHigh", {}, "Lossy float compression list (high)"}
 Vector of item names.
StringArrayProperty m_compressionListLow {this, "CompressionListLow", {}, "Lossy float compression list (low)"}
 Vector of item names.
UnsignedIntegerProperty m_compressionBitsHigh {this, "CompressionBitsHigh", 7, "Lossy float compression bits (high)"}
 Number of mantissa bits in the float compression.
UnsignedIntegerProperty m_compressionBitsLow {this, "CompressionBitsLow", 15, "Lossy float compression bits (low)"}
 Number of mantissa bits in the float compression.
StringArrayProperty m_transientItems {this, "TransientItems", {}, "Transient item list"}
 List of items that are known to be present in the transient store (and hence we can make input dependencies on them).
StringProperty m_outputName {this, "OutputFile", "DidNotNameOutput.root", "Name of the output file"}
 Name of the output file.
StringProperty m_persName {this, "EvtConversionSvc", "EventPersistencySvc", "Name of the persistency service writing data"}
 Name of the persistency service capable to write data from the store.
BooleanProperty m_forceRead {this, "ForceRead", true, "Force read data objects in ItemList"}
 set to true to force read of data objects in item list
BooleanProperty m_extendProvenanceRecord {this, "ExtendProvenanceRecord", true, "Extend provenance record"}
 Set to false to omit adding the current DataHeader into the DataHeader history This will cause the input file to be neglected for back navigation (replace mode).
BooleanProperty m_itemListFromTool {this, "TakeItemsFromInput", false, "Write everything in input DataHeader to output"}
 Set to write out everything in input DataHeader.
ToolHandle< SG::IFolderm_p2BWritten
 The top-level folder with items to be written.
ToolHandle< SG::IFolderm_compressionDecoderHigh
 The top-level folder with items to be compressed high.
ToolHandle< SG::IFolderm_compressionDecoderLow
 The top-level folder with items to be compressed low.
ToolHandle< SG::IFolderm_transient
 Decoded list of transient ids.
std::multimap< CLID, std::string > m_CLIDKeyPairs
 Map of (clid,key) pairs to be excluded (comes from m_excludeList)
IDataSelector m_objects
 Collection of objects being selected.
IDataSelector m_altObjects
 Objects overridden by ‘exact’ handling.
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
 Collection of DataObject instances owned by this service.
ToolHandle< IAthenaOutputStreamToolm_streamer
 pointer to AthenaOutputStreamTool
ToolHandleArray< IAthenaOutputToolm_helperTools {this, "HelperTools", {}, "List of AlgTools used by this stream"}
 vector of AlgTools that that are executed by this stream
bool m_writeMetadataAndDisconnect {false}
std::atomic< int > m_events {0}
 Number of events written to this output stream.
std::map< unsigned, std::string > m_slotRangeMap
 map of filenames assigned to active slots
std::map< std::string, std::string > m_rangeIDforRangeFN
 map of RangeIDs (as used by the Sequencer) for each Range filename generated
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
 map of streamerTools handling event ranges in MT
mutex_t m_mutex
 mutex for this Stream write() and handle() methods
SG::WriteHandleKey< SG::SelectionVetoesm_selVetoesKey { this, "SelVetoesKey", "" }
 Key used for recording selected dynamic variable information to the event store.
SG::WriteHandleKey< SG::CompressionInfom_compInfoKey { this, "CompInfoKey", "" }
 Key used for recording lossy float compressed variable information to the event store.
std::string m_outputAttributes
 Output attributes.
virtual StatusCode initialize () override
virtual StatusCode finalize () override
virtual StatusCode execute () override
virtual StatusCode write ()
 Stream the data.
void clearSelection ()
 Clear list of selected objects.
StatusCode collectAllObjects ()
 Collect data objects for output streamer list.
IDataSelector * selectedObjects ()
 Return the list of selected objects.
virtual void handle (const Incident &incident) override
 Incident service handle listening for MetaDataStop.
virtual StatusCode io_reinit () override
 Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2))
virtual StatusCode io_finalize () override
void itemListHandler (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property.
void excludeListHandler (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property.
void compressionListHandlerHigh (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property.
void compressionListHandlerLow (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property.
StatusCode addItemObjects (const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
 Add item data objects to output streamer list.
void handleVariableSelection (const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
 Here we build the vetoed AuxIDs.
void writeMetaData (const std::string &outputFN="")
 Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)
std::set< std::string > buildCompressionSet (const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
 Helper function for building the compression lists.
void finalizeRange (const std::string &rangeFN)
void loadDict (CLID clid)
 Helper function to load dictionaries (both transient and persistent) for a given type.
bool simpleMatch (const std::string &pattern, const std::string &text)
 Glob-style matcher, where the only meta-character is '*'.

Detailed Description

algorithm that marks for write data objects in SG

Author
srini.nosp@m.r@bn.nosp@m.l.gov

Definition at line 53 of file AthenaOutputStream.h.

Member Typedef Documentation

◆ Items

typedefs

Definition at line 58 of file AthenaOutputStream.h.

◆ mutex_t

typedef std::recursive_mutex AthenaOutputStream::mutex_t

Definition at line 60 of file AthenaOutputStream.h.

◆ TypeKeyPairs

typedef std::vector<std::pair<std::string, std::string> > AthenaOutputStream::TypeKeyPairs

Definition at line 59 of file AthenaOutputStream.h.

Constructor & Destructor Documentation

◆ AthenaOutputStream()

AthenaOutputStream::AthenaOutputStream ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard algorithm Constructor.

Definition at line 43 of file AthenaOutputStream.cxx.

44 : base_class(name, pSvcLocator),
46 m_p2BWritten(std::format("SG::Folder/{}_TopFolder", name), this),
47 m_compressionDecoderHigh(std::format("SG::Folder/{}_compressed_high", name), this),
48 m_compressionDecoderLow(std::format("SG::Folder/{}_compressed_low", name), this),
49 m_transient(std::format("SG::Folder/{}_transient", name), this),
50 m_streamer(std::format("AthenaOutputStreamTool/{}Tool", name), this)
51{
52 // Ensure the service locater is good
53 assert(pSvcLocator);
54
55 // This property depends on the name that's known at construction time
56 // Therefore, do it the old fashioned way
57 declareProperty("WritingTool", m_streamer);
58
59 // Associate action handlers with the AcceptAlgs,
60 // RequireAlgs & VetoAlgs properties
61 m_itemList.declareUpdateHandler(&AthenaOutputStream::itemListHandler, this);
64}
ToolHandle< SG::IFolder > m_compressionDecoderLow
The top-level folder with items to be compressed low.
ToolHandle< SG::IFolder > m_p2BWritten
The top-level folder with items to be written.
StringArrayProperty m_compressionListLow
Vector of item names.
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
ToolHandle< SG::IFolder > m_compressionDecoderHigh
The top-level folder with items to be compressed high.
StringArrayProperty m_itemList
Vector of item names.
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
StringArrayProperty m_compressionListHigh
Vector of item names.
ServiceHandle< StoreGateSvc > * m_currentStore
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
ServiceHandle< StoreGateSvc > m_dataStore
Handle to the StoreGateSvc store where the data we want to write out resides.
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.

◆ ~AthenaOutputStream()

AthenaOutputStream::~AthenaOutputStream ( )
virtual

Standard Destructor.

Definition at line 67 of file AthenaOutputStream.cxx.

67 {
68 // Clear the internal caches
69 m_streamerMap.clear();
70}
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT

Member Function Documentation

◆ addItemObjects()

StatusCode AthenaOutputStream::addItemObjects ( const SG::FolderItem & item,
SG::SelectionVetoes & vetoes,
SG::CompressionInfo & compInfo )
private

Add item data objects to output streamer list.

Handle variable selections. Both variable selection and lossy float compression are limited to event data for the time being

Definition at line 567 of file AthenaOutputStream.cxx.

570{
571 // anything after a dot is a list of dynamic Aux attributes, separated by dots
572 size_t dotpos = item.key().find('.');
573 std::string item_key, aux_attr;
574 if( dotpos != std::string::npos ) {
575 item_key = item.key().substr(0, dotpos+1);
576 aux_attr = item.key().substr(dotpos+1);
577 } else {
578 item_key = item.key();
579 }
580 CLID item_id = item.id();
581 ATH_MSG_DEBUG(std::format("addItemObjects({},\"{}\") called", item_id, item_key));
582 ATH_MSG_DEBUG(std::format(" Key:{}", item_key));
583 if( aux_attr.size() ) {
584 ATH_MSG_DEBUG(std::format(" Aux Attr:{}", aux_attr));
585 }
586
587 // Here we build the list of attributes for the lossy float compression
588 // Note that we do not allow m_compressionBitsHigh >= m_compressionBitsLow
589 // Otherwise is, in any case, a logical error and they'd potentially overwrite each other
590 std::map< unsigned int, std::set< std::string > > comp_attr_map;
591 comp_attr_map[ m_compressionBitsHigh ] = buildCompressionSet( m_compressionDecoderHigh, item_id, item_key );
592 comp_attr_map[ m_compressionBitsLow ] = buildCompressionSet( m_compressionDecoderLow, item_id, item_key );
593
594 // Print some debugging information regarding the lossy float compression configuration
595 for( const auto& it : comp_attr_map ) {
596 ATH_MSG_DEBUG(std::format(" Comp Attr {} with {} mantissa bits.", it.second.size(), it.first));
597 if ( it.second.size() > 0 ) {
598 for( const auto& attr : it.second ) {
599 ATH_MSG_DEBUG(std::format(" >> {}", attr));
600 }
601 }
602 }
603
604 // For MetaData objects of type T that are kept in MetaContainers get the MetaCont<T> ID
605 const CLID remapped_item_id = m_metaDataSvc->remapMetaContCLID( item_id );
607 SG::ProxyMap map;
608 bool gotProxies = false;
609 // Look for the clid in storegate
610 SG::DataProxy* match = (*m_currentStore)->proxy(remapped_item_id, item_key, true);
611 if (match != nullptr) {
612 map.insert({item_key, match});
613 iter = map.begin();
614 end = map.end();
615 gotProxies = true;
616 }
617 // Look for the clid in storegate
618 if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter, end)).isSuccess()) {
619 gotProxies = true;
620 }
621 if (gotProxies) {
622 bool added = false, removed = false;
623 // Now loop over any found proxies
624 for (; iter != end; ++iter) {
625 SG::DataProxy* itemProxy(iter->second);
626 std::string proxyName = itemProxy->name();
627 std::string stream;
629 // only check metadata keys
630 stream = m_metaDataSvc->removeStreamFromKey(proxyName); // can modify proxyName
631 }
632 // Does this key match the proxy key name - allow for wildcarding and aliases
633 bool keyMatch = ( item_key == "*" ||
634 item_key == proxyName ||
635 itemProxy->hasAlias(item_key) );
636 if (!keyMatch) {
637 // For item list we currently allow wildcards ('*'), which has limited use, e.g.:
638 // xAOD::CutBookkeeperAuxContainer#IncompleteCutBookkeepers*Aux.
639 // Here we look for those few cases...
640 keyMatch = simpleMatch(item_key, proxyName);
641 ATH_MSG_DEBUG(std::format("Result of checking {} against {} to see if it matches is {}",
642 proxyName, item_key, keyMatch));
643 }
644
645 // Now check if this item is marked for another output stream, if so we reject it
646 // We also reject keys that are marked transient at this point
647 bool xkeyMatch = false;
648 if( (!stream.empty() and stream != m_outputName) || SG::isTransientKey(proxyName) ) {
649 // reject keys that are marked for a different output stream
650 ATH_MSG_DEBUG(std::format("Rejecting key: {} in output: {}", itemProxy->name(), m_outputName.toString()));
651 xkeyMatch = true;
652 }
653
654 // All right, it passes key match find in itemList, but not in excludeList
655 if (keyMatch && !xkeyMatch) {
656 if (m_forceRead && itemProxy->isValid()) {
657 if (nullptr == itemProxy->accessData()) {
658 ATH_MSG_ERROR(std::format(" Could not get data object for id {},\"{}\"", remapped_item_id, proxyName));
659 }
660 }
661 if (nullptr != itemProxy->object()) {
662 if( std::find(m_objects.begin(), m_objects.end(), itemProxy->object()) == m_objects.end() &&
663 std::find(m_altObjects.begin(), m_altObjects.end(), itemProxy->object()) == m_altObjects.end() )
664 {
665 if( item_id != remapped_item_id ) {
666 // For MetaCont<T>: -
667 // create a temporary DataObject for an entry in the container to pass to CnvSvc
668 DataBucketBase* dbb = static_cast<DataBucketBase*>( itemProxy->object() );
669 const MetaContBase* metaCont = static_cast<MetaContBase*>( dbb->cast( ClassID_traits<MetaContBase>::ID() ) );
670 void* obj = metaCont? metaCont->getAsVoid( m_outSeqSvc->currentRangeID() ) : nullptr;
671 if( obj ) {
672 auto altbucket = std::make_unique<AltDataBucket>(
673 obj, item_id, *CLIDRegistry::CLIDToTypeinfo(item_id), proxyName );
674 m_objects.push_back( altbucket.get() );
675 m_ownedObjects.push_back( std::move(altbucket) );
676 m_altObjects.push_back( itemProxy->object() ); // only for duplicate prevention
677 } else {
678 ATH_MSG_ERROR(std::format("Failed to retrieve object from MetaCont with key={}, for EventRangeID={}",
679 item_key, m_outSeqSvc->currentRangeID()));
680 return StatusCode::FAILURE;
681 }
682 } else if (item.exact()) {
683 // If the exact flag is set, make a new DataObject
684 // holding the object as the requested type.
685 DataBucketBase* dbb = dynamic_cast<DataBucketBase*> (itemProxy->object());
686 if (!dbb) std::abort();
687 void* ptr = dbb->cast (item_id);
688 if (!ptr) {
689 // Hard cast
690 ptr = dbb->object();
691 }
692 auto altbucket =
693 std::make_unique<AltDataBucket>
694 (ptr, item_id,
696 *itemProxy);
697 m_objects.push_back(altbucket.get());
698 m_ownedObjects.push_back (std::move(altbucket));
699 m_altObjects.push_back (itemProxy->object());
700 }
701 else
702 m_objects.push_back(itemProxy->object());
703 ATH_MSG_DEBUG(std::format(" Added object {},\"{}\"", item_id, proxyName));
704 }
705
709 if ((*m_currentStore)->storeID() == StoreID::EVENT_STORE &&
710 item_key.find( RootAuxDynIO::AUX_POSTFIX ) == ( item_key.size() - 4 )) {
711
712 const SG::IConstAuxStore* auxstore( nullptr );
713 try {
714 SG::fromStorable( itemProxy->object(), auxstore, true );
715 } catch( const std::exception& ) {
716 ATH_MSG_DEBUG(std::format("Error in casting object with CLID {} to SG::IConstAuxStore*", itemProxy->clID()));
717 auxstore = nullptr;
718 }
719
720 if (auxstore) {
721 handleVariableSelection (*auxstore, *itemProxy,
722 aux_attr, vetoes);
723
724 // Here comes the compression logic using ThinningInfo
725 // Get a hold of all AuxIDs for this store (static, dynamic etc.)
726 const SG::auxid_set_t allVars = auxstore->getAuxIDs();
727
728 // Get a handle on the compression information for this store
729 std::string key = item_key;
730 key.erase (key.size()-4, 4);
731
732 // Build the compression list, retrieve the relevant AuxIDs and
733 // store it in the relevant map that is going to be inserted into
734 // the ThinningCache later on by the ThinningCacheTool
735 xAOD::AuxCompression compression;
736 compression.setCompressedAuxIDs( comp_attr_map );
737 for( const auto& it : compression.getCompressedAuxIDs( allVars ) ) {
738 if( it.second.size() > 0 ) { // insert only if the set is non-empty
739 compInfo[ key ][ it.first ] = it.second;
740 ATH_MSG_DEBUG(std::format("Container {} has {} variables that'll be "
741 "lossy float compressed with {} mantissa bits",
742 key, it.second.size(), it.first));
743 }
744 } // End of loop over variables to be lossy float compressed
745 } // End of lossy float compression logic
746
747 }
748
749 added = true;
750 }
751 } else if (keyMatch && xkeyMatch) {
752 removed = true;
753 }
754 } // proxy loop
755 if (!added && !removed) {
756 ATH_MSG_DEBUG(std::format(" No object matching {},\"{}\" found", item_id, item_key));
757 } else if (removed) {
758 ATH_MSG_DEBUG(std::format(" Object being excluded based on property setting {},\"{}\". Skipping",
759 item_id, item_key));
760 }
761 } else {
762 ATH_MSG_DEBUG(std::format(" Failed to receive proxy iterators from StoreGate for {},\"{}\". Skipping",
763 item_id, item_key));
764 }
765 return StatusCode::SUCCESS;
766}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_DEBUG(x)
uint32_t CLID
The Class ID type.
IDataSelector m_objects
Collection of objects being selected.
ServiceHandle< StoreGateSvc > m_metadataStore
UnsignedIntegerProperty m_compressionBitsLow
Number of mantissa bits in the float compression.
UnsignedIntegerProperty m_compressionBitsHigh
Number of mantissa bits in the float compression.
ServiceHandle< MetaDataSvc > m_metaDataSvc
Handles to all the necessary services.
StringProperty m_outputName
Name of the output file.
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Here we build the vetoed AuxIDs.
IDataSelector m_altObjects
Objects overridden by ‘exact’ handling.
BooleanProperty m_forceRead
set to true to force read of data objects in item list
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
bool simpleMatch(const std::string &pattern, const std::string &text)
Glob-style matcher, where the only meta-character is '*'.
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
virtual void * object()=0
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
virtual void * getAsVoid(const SourceID &sid) const =0
@ EVENT_STORE
Definition StoreID.h:26
virtual std::map< unsigned int, SG::auxid_set_t > getCompressedAuxIDs(const SG::auxid_set_t &fullset) const
Return those variables that are selected to be compressed per compression setting.
virtual void setCompressedAuxIDs(const std::map< unsigned int, std::set< std::string > > &attributes)
Set which variables should be compressed per compression setting.
bool match(std::string s1, std::string s2)
match the individual directories of two strings
Definition hcg.cxx:357
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
std::map< std::string, DataProxy * > ProxyMap
Definition ProxyMap.h:22
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
ProxyMap::const_iterator ConstProxyIterator
Definition ProxyMap.h:24
bool isTransientKey(const std::string &key)
Test to see if a key is transoent.
void * ptr(T *p)
Definition SGImplSvc.cxx:74

◆ buildCompressionSet()

std::set< std::string > AthenaOutputStream::buildCompressionSet ( const ToolHandle< SG::IFolder > & handle,
const CLID & item_id,
const std::string & item_key ) const
private

Helper function for building the compression lists.

Here we build the list of attributes for the float compression CompressionList follows the same logic as the ItemList We find the matching keys, read the string after "Aux.", tokenize by "." and build an std::set of these to be communicated to ThinningInfo elsewhere in the code.

Definition at line 774 of file AthenaOutputStream.cxx.

777{
778 // Create an empty result
779 std::set<std::string> result;
780
781 // Check the item is indeed Aux.
782 if(item_key.find("Aux.") == std::string::npos) {
783 return result;
784 }
785
786 // First the high compression list
787 for (const auto& iter : *handle) {
788 // First match the IDs for early rejection.
789 if (iter.id() != item_id) {
790 continue;
791 }
792 // Then find the compression item key and the compression list string
793 size_t seppos = iter.key().find('.');
794 std::string comp_item_key{""}, comp_str{""};
795 if(seppos != std::string::npos) {
796 comp_item_key = iter.key().substr(0, seppos+1);
797 comp_str = iter.key().substr(seppos+1);
798 } else {
799 comp_item_key = iter.key();
800 }
801 // Proceed only if the keys match and the
802 // compression list string is not empty
803 if (!comp_str.empty() && comp_item_key == item_key) {
804 std::stringstream ss(comp_str);
805 std::string attr;
806 while( std::getline(ss, attr, '.') ) {
807 result.insert(attr);
808 }
809 }
810 }
811
812 // All done, return the result
813 return result;
814}
static Double_t ss
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.

◆ clearSelection()

void AthenaOutputStream::clearSelection ( )

Clear list of selected objects.

Definition at line 526 of file AthenaOutputStream.cxx.

526 {
527 m_objects.clear();
528 m_ownedObjects.clear();
529 m_altObjects.clear();
530}

◆ collectAllObjects()

StatusCode AthenaOutputStream::collectAllObjects ( )

Collect data objects for output streamer list.

Definition at line 533 of file AthenaOutputStream.cxx.

533 {
534 if (m_itemListFromTool) {
535 if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
536 ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
537 }
538 }
539
540 // This holds the vetoes for the AuxID selection
541 auto vetoes = std::make_unique<SG::SelectionVetoes>();
542 // This holds the lossy float compression information
543 auto compInfo = std::make_unique<SG::CompressionInfo>();
544
545 m_p2BWritten->updateItemList(true);
546 // Collect all objects that need to be persistified:
547 for (const auto& i : *m_p2BWritten) {
548 ATH_CHECK( addItemObjects(i, *vetoes, *compInfo) );
549 }
550
551 // If there were any variable selections, record the information in SG.
552 if (!vetoes->empty()) {
553 ATH_CHECK( SG::makeHandle (m_selVetoesKey).record (std::move (vetoes)) );
554 }
555
556 // Store the lossy float compression information in the SG.
557 if (!compInfo->empty()) {
558 ATH_CHECK( SG::makeHandle (m_compInfoKey).record (std::move (compInfo)) );
559 }
560
561 return StatusCode::SUCCESS;
562}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
BooleanProperty m_itemListFromTool
Set to write out everything in input DataHeader.
StatusCode addItemObjects(const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())

◆ compressionListHandlerHigh()

void AthenaOutputStream::compressionListHandlerHigh ( Gaudi::Details::PropertyBase & )
protected

Handler for ItemNames Property.

Definition at line 873 of file AthenaOutputStream.cxx.

873 {
874 IProperty *pAsIProp(nullptr);
875 if ((m_compressionDecoderHigh.retrieve()).isFailure() ||
876 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderHigh)) ||
877 (pAsIProp->setProperty("ItemList", m_compressionListHigh.toString())).isFailure()) {
878 throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
879 }
880}

◆ compressionListHandlerLow()

void AthenaOutputStream::compressionListHandlerLow ( Gaudi::Details::PropertyBase & )
protected

Handler for ItemNames Property.

Definition at line 882 of file AthenaOutputStream.cxx.

882 {
883 IProperty *pAsIProp(nullptr);
884 if ((m_compressionDecoderLow.retrieve()).isFailure() ||
885 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderLow)) ||
886 (pAsIProp->setProperty("ItemList", m_compressionListLow.toString())).isFailure()) {
887 throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
888 }
889}

◆ excludeListHandler()

void AthenaOutputStream::excludeListHandler ( Gaudi::Details::PropertyBase & )
protected

Handler for ItemNames Property.

◆ execute()

StatusCode AthenaOutputStream::execute ( )
overridevirtual

Definition at line 406 of file AthenaOutputStream.cxx.

406 {
407 bool failed = false;
408 // Call tool preExecute prior to writing
409 for (auto& tool : m_helperTools) {
410 if (!tool->preExecute().isSuccess()) {
411 failed = true;
412 }
413 }
414 // Write the event if the event is accepted
415 if (isEventAccepted()) {
416 if (write().isFailure()) {
417 failed = true;
418 }
419 }
420 // Call tool postExecute after writing
421 for (auto& tool : m_helperTools) {
422 if(!tool->postExecute().isSuccess()) {
423 failed = true;
424 }
425 }
426 // See if we should write metadata and do if so
430 // finalize will disconnect output
431 if( !finalize().isSuccess() ) {
432 failed = true;
433 }
434 }
435 if (failed) {
436 return(StatusCode::FAILURE);
437 }
438 return(StatusCode::SUCCESS);
439}
virtual StatusCode finalize() override
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
virtual StatusCode write()
Stream the data.
void writeMetaData(const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)

◆ finalize()

StatusCode AthenaOutputStream::finalize ( )
overridevirtual

Definition at line 378 of file AthenaOutputStream.cxx.

379{
380 bool failed = false;
381 ATH_MSG_DEBUG("finalize: Optimize output");
382 // Connect the output file to the service
383 if (!m_streamer->finalizeOutput().isSuccess()) {
384 failed = true;
385 }
386 ATH_MSG_DEBUG("finalize: end optimize output");
387 // Release the tools
388 if (!m_helperTools.release().isSuccess()) {
389 failed = true;
390 }
391 if (!m_streamer.release().isSuccess()) {
392 failed = true;
393 }
394 if (failed) {
395 return(StatusCode::FAILURE);
396 }
397 // Clear the internal caches
398 m_objects.clear();
399 m_objects.shrink_to_fit();
400 m_ownedObjects.clear();
401 m_altObjects.clear();
402 return(StatusCode::SUCCESS);
403}

◆ finalizeRange()

void AthenaOutputStream::finalizeRange ( const std::string & rangeFN)
private

Definition at line 298 of file AthenaOutputStream.cxx.

299{
300 ATH_MSG_DEBUG(std::format("Writing MetaData to {}", rangeFN));
301 // MN: not calling StopMetaData Incident here but directly writeMetaData() - OK for Sim, check others
302 // metadata tools like CutFlowSvc are not able to handle this yet
303 const std::string rememberID = m_outSeqSvc->setRangeID( m_rangeIDforRangeFN[ rangeFN ] );
304 writeMetaData( rangeFN );
305 m_outSeqSvc->setRangeID( rememberID );
306
307 ATH_MSG_INFO(std::format("Finished writing Event Sequence to {}", rangeFN));
308 auto strm_iter = m_streamerMap.find( rangeFN );
309 strm_iter->second->finalizeOutput().ignore();
310 strm_iter->second->finalize().ignore();
311 m_streamerMap.erase( strm_iter );
312 m_outSeqSvc->publishRangeReport( rangeFN );
313}
#define ATH_MSG_INFO(x)
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated

◆ handle()

void AthenaOutputStream::handle ( const Incident & incident)
overridevirtual

Incident service handle listening for MetaDataStop.

Definition at line 223 of file AthenaOutputStream.cxx.

224{
225 ATH_MSG_DEBUG(std::format("handle() incident type: {}", inc.type()));
226 // mutex shared with write() which is called from writeMetaData
227 std::unique_lock<mutex_t> lock(m_mutex);
228
229 if( inc.type() == "MetaDataStop" ) {
230 if( m_outSeqSvc->inUse() ) {
231 if( m_outSeqSvc->inConcurrentEventsMode() ) {
232 // EventService MT - write metadata and close all remaining substreams
233 while( m_streamerMap.size() > 0 ) {
234 finalizeRange( m_streamerMap.begin()->first );
235 }
236 return;
237 }
238 if( m_outSeqSvc->lastIncident() == "EndEvent" ) {
239 // in r22 EndEvent comes before output writing
240 // - queue metadata writing and disconnect for after Event write
242 return;
243 }
244 }
245 // not in Event Service
247 }
248 else if( m_outSeqSvc->inUse() ) {
249 // Handle Event Ranges for Event Service
250 EventContext::ContextID_t slot = inc.context().slot();
251 if( slot == EventContext::INVALID_CONTEXT_ID ) {
252 throw GaudiException("Received Incident with invalid slot in ES mode", name(), StatusCode::FAILURE);
253 }
254 auto count_events_in_range = [&](const std::string& range) {
255 return std::count_if(m_slotRangeMap.cbegin(), m_slotRangeMap.cend(),
256 [&](auto& el){return el.second == range;} );
257 };
258 if( inc.type() == IncidentType::BeginProcessing ) {
259 // get the current/old range filename for this slot
260 const std::string rangeFN = m_slotRangeMap[ slot ];
261 // build the new range filename for this slot
262 const std::string newRangeFN = m_outSeqSvc->buildSequenceFileName( m_outputName );
263 if( !rangeFN.empty() and rangeFN != newRangeFN ) {
264 ATH_MSG_INFO(std::format("Slot range change: '{}' -> '{}'", rangeFN, newRangeFN));
265 ATH_MSG_DEBUG(std::format("There are {} slots in use",m_slotRangeMap.size()));
266 for(const auto & range : m_slotRangeMap ) {
267 ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
268 }
269 if( count_events_in_range(rangeFN) == 1 ) {
270 finalizeRange( rangeFN );
271 }
272 }
273 ATH_MSG_INFO(std::format("slot {} processing event in range: {}", slot, newRangeFN));
274 m_slotRangeMap[ slot ] = newRangeFN;
275 // remember the RangeID for this slot so we can write metadata *after* a range change
276 m_rangeIDforRangeFN[ newRangeFN ] = m_outSeqSvc->currentRangeID();
277 }
278 else if( inc.type() == IncidentType::EndProcessing ) {
279 ATH_MSG_DEBUG(std::format("There are {} slots in use", m_slotRangeMap.size()));
280 for( const auto& range : m_slotRangeMap ) {
281 ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
282 }
283 if( m_slotRangeMap.size() > 1 ) {
284 // if there are multiple slots, we can detect if the range ended with this event
285 // - except the last range, because there is no next range to clear the slot map
286 const std::string rangeFN = m_slotRangeMap[ slot ];
287 if( count_events_in_range(rangeFN) == 1 ) {
288 finalizeRange( rangeFN );
289 m_slotRangeMap[ slot ].clear();
290 }
291 }
292 }
293 }
294 ATH_MSG_DEBUG(std::format("Leaving incident handler for {}", inc.type()));
295}
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
mutex_t m_mutex
mutex for this Stream write() and handle() methods
void finalizeRange(const std::string &rangeFN)

◆ handleVariableSelection()

void AthenaOutputStream::handleVariableSelection ( const SG::IConstAuxStore & auxstore,
SG::DataProxy & itemProxy,
const std::string & aux_attr,
SG::SelectionVetoes & vetoes ) const
private

Here we build the vetoed AuxIDs.

Definition at line 817 of file AthenaOutputStream.cxx.

821{
822 // Collect dynamic Aux selection (parse the line, attributes separated by dot)
823 std::set<std::string> attributes;
824 if( aux_attr.size() ) {
825 std::stringstream ss(aux_attr);
826 std::string attr;
827 while( std::getline(ss, attr, '.') ) {
828 attributes.insert(attr);
829 }
830 }
831
832 // Return early if there's no selection.
833 if (attributes.empty()) {
834 return;
835 }
836
837 std::string key = itemProxy.name();
838 if (key.size() >= 4 && key.compare (key.size()-4, 4, "Aux.")==0)
839 {
840 key.erase (key.size()-4, 4);
841 }
842
843 // Find the entry for the selection.
844 SG::auxid_set_t& vset = vetoes[key];
845
846 // Form the veto mask for this object.
847 xAOD::AuxSelection sel;
848 sel.selectAux (attributes);
849
850 // Get all the AuxIDs that we know of and the selected ones
851 SG::auxid_set_t all = auxstore.getAuxIDs();
852 SG::auxid_set_t selected = sel.getSelectedAuxIDs( all );
853
854 // Loop over all and build a list of vetoed AuxIDs from non selected ones
855 for( const SG::auxid_t auxid : all ) {
856 if ( !selected.test( auxid ) ) {
857 vset.insert( auxid );
858 }
859 }
860}
bool test(bit_t bit) const
Test to see if a bit is set.
ConcurrentBitset & insert(bit_t bit, bit_t new_nbits=0)
Set a bit to 1.
virtual const name_type & name() const override final
Retrieve data object key == string.
virtual const SG::auxid_set_t & getAuxIDs() const =0
Return a set of identifiers for existing data items in this store.
size_t auxid_t
Identifier for a particular aux data item.
Definition AuxTypes.h:27

◆ initialize()

StatusCode AthenaOutputStream::initialize ( )
overridevirtual

Definition at line 73 of file AthenaOutputStream.cxx.

73 {
74 ATH_MSG_DEBUG("In initialize");
75
76 // Initialize the FilteredAlgorithm base
78
79 // Reset the number of events written
80 m_events = 0;
81
82 // Set up the SG services
83 ATH_CHECK( m_dataStore.retrieve() );
84 ATH_MSG_DEBUG(std::format("Found {} store.", m_dataStore.typeAndName()));
85 if (!m_metadataItemList.value().empty()) {
86 ATH_CHECK( m_metadataStore.retrieve() );
87 ATH_MSG_DEBUG(std::format("Found {} store.", m_metadataStore.typeAndName()));
88 }
89
90 // Set up various services
91 ATH_CHECK( m_pCLIDSvc.retrieve() );
92 ATH_CHECK( m_dictLoader.retrieve() );
93 ATH_CHECK( m_tpCnvSvc.retrieve() );
94 ATH_CHECK( m_outSeqSvc.retrieve() );
95
96 // Get Output Stream tool for writing
97 ATH_CHECK( m_streamer.retrieve() );
98 ATH_CHECK( m_streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord) );
99
100 ATH_CHECK( m_helperTools.retrieve() );
101 ATH_MSG_INFO("Found " << m_helperTools);
102 ATH_MSG_INFO(std::format("Data output: {}", m_outputName.toString()));
103
104 for (auto& tool : m_helperTools) {
105 ATH_CHECK( tool->postInitialize() );
106 }
107
108 // Register this algorithm for 'I/O' events
109 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
110 ATH_CHECK( iomgr.retrieve() );
111 ATH_CHECK( iomgr->io_register(this) );
112 ATH_CHECK( iomgr->io_register(this, IIoComponentMgr::IoMode::WRITE, m_outputName) );
113 ATH_CHECK( this->io_reinit() );
114
115 // Add an explicit input dependency for everything in our item list
116 // that we know from the configuration is in the transient store.
117 // We don't want to add everything on the list, because configurations
118 // often initialize this with a maximal static list of everything
119 // that could possibly be written.
120 {
121 ATH_CHECK( m_transient.retrieve() );
122 IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
123 if (!pAsIProp) {
124 ATH_MSG_FATAL ("Bad folder interface");
125 return StatusCode::FAILURE;
126 }
127 ATH_CHECK (pAsIProp->setProperty("ItemList", m_transientItems.toString()));
128
129 for (const SG::FolderItem& item : *m_p2BWritten) {
130 // Load ROOT dictionaries now.
131 loadDict (item.id());
132
133 const std::string& k = item.key();
134 if (k.find('*') != std::string::npos) continue;
135 if (k.find('.') != std::string::npos) continue;
136 for (const SG::FolderItem& titem : *m_transient) {
137 if (titem.id() == item.id() && titem.key() == k) {
138 DataObjID id (item.id(), std::format("{}+{}", m_dataStore.name(), k));
139 this->addDependency (id, Gaudi::DataHandle::Reader);
140 break;
141 }
142 }
143 }
144 m_transient->clear();
145 }
146
147 // Also load dictionaries for metadata classes.
148 if (!m_metadataItemList.value().empty()) {
149 IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
150 if (!pAsIProp) {
151 ATH_MSG_FATAL ("Bad folder interface");
152 return StatusCode::FAILURE;
153 }
154 ATH_CHECK (pAsIProp->setProperty("ItemList", m_metadataItemList.toString()));
155 for (const SG::FolderItem& item : *m_transient) {
156 loadDict (item.id());
157 }
158 m_transient->clear();
159 }
160
161 // Also make sure we have the dictionary for Token.
162 m_dictLoader->load_type ("Token");
163
164 // Listen to event range incidents if incident name is configured
165 ATH_CHECK( m_incidentSvc.retrieve() );
166 if( !m_outSeqSvc->incidentName().empty() ) {
167 // use priority 95 to make sure the Output Sequencer goes first (it has priority 100)
168 m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 95);
169 m_incidentSvc->addListener(this, IncidentType::EndProcessing, 95);
170 }
171
172 // Check compression settings and print some information about the configuration
173 // Both should be between [5, 23] and high compression should be < low compression
175 ATH_MSG_INFO(std::format("Float compression mantissa bits for high compression "
176 "({}) is outside the allowed range of [5, 23].",
177 m_compressionBitsHigh.toString()));
178 ATH_MSG_INFO("Setting it to the appropriate limit.");
180 }
182 ATH_MSG_INFO(std::format("Float compression mantissa bits for low compression "
183 "({}) is outside the allowed range of [5, 23].",
184 m_compressionBitsLow.toString()));
185 ATH_MSG_INFO("Setting it to the appropriate limit.");
187 }
189 ATH_MSG_ERROR(std::format("Float compression mantissa bits for low compression "
190 "({}) is lower than or equal to high compression "
191 "({})! Please check the configuration! ",
192 m_compressionBitsLow.toString(),
193 m_compressionBitsHigh.toString()));
194 return StatusCode::FAILURE;
195 }
196 if(m_compressionListHigh.value().empty() && m_compressionListLow.value().empty()) {
197 ATH_MSG_VERBOSE("Both high and low float compression lists are empty. Float compression will NOT be applied.");
198 } else {
199 ATH_MSG_INFO("Either high or low (or both) float compression lists are defined. Float compression will be applied.");
200 ATH_MSG_INFO(std::format("High compression will use {} mantissa bits, and "
201 "low compression will use {} mantissa bits.",
202 m_compressionBitsHigh.toString(),
203 m_compressionBitsLow.toString()));
204 }
205
206 // Setup stream name
207 if (m_streamName.empty()) {
208 m_streamName.setValue(this->name());
209 }
210
211 // Set SG key for selected variable information.
212 m_selVetoesKey = std::format("SelectionVetoes_{}", m_streamName.toString());
213 ATH_CHECK( m_selVetoesKey.initialize() );
214
215 m_compInfoKey = std::format("CompressionInfo_{}", m_streamName.toString());
216 ATH_CHECK( m_compInfoKey.initialize() );
217
218 ATH_MSG_DEBUG("End initialize");
219 return StatusCode::SUCCESS;
220}
#define ATH_MSG_FATAL(x)
#define ATH_MSG_VERBOSE(x)
StringProperty m_streamName
Stream name (defaults to algorithm name)
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
ServiceHandle< IClassIDSvc > m_pCLIDSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
ServiceHandle< IDictLoaderSvc > m_dictLoader
StringProperty m_persName
Name of the persistency service capable to write data from the store.
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
StringArrayProperty m_metadataItemList
Vector of item names.
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
BooleanProperty m_extendProvenanceRecord
Set to false to omit adding the current DataHeader into the DataHeader history This will cause the in...
std::atomic< int > m_events
Number of events written to this output stream.
virtual StatusCode initialize()

◆ io_finalize()

StatusCode AthenaOutputStream::io_finalize ( )
overridevirtual

Definition at line 907 of file AthenaOutputStream.cxx.

907 {
908 ATH_MSG_INFO("I/O finalization...");
909 for (auto& tool : m_helperTools) {
910 if (!tool->preFinalize().isSuccess()) {
911 ATH_MSG_ERROR("Cannot finalize helper tool");
912 }
913 }
914 const Incident metaDataStopIncident(name(), "MetaDataStop");
915 this->handle(metaDataStopIncident);
916 m_incidentSvc->removeListener(this, "MetaDataStop");
917 if (m_dataStore->clearStore().isFailure()) {
918 ATH_MSG_WARNING("Cannot clear the DataStore");
919 }
920 return StatusCode::SUCCESS;
921}

◆ io_reinit()

StatusCode AthenaOutputStream::io_reinit ( )
overridevirtual

Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2))

Definition at line 893 of file AthenaOutputStream.cxx.

893 {
894 ATH_MSG_INFO("I/O reinitialization...");
895 m_incidentSvc->removeListener(this, "MetaDataStop"); // Remove any existing listener to avoid handling the incident multiple times
896 m_incidentSvc->addListener(this, "MetaDataStop", 50);
897 for (auto& tool : m_helperTools) {
898 if (!tool->postInitialize().isSuccess()) {
899 ATH_MSG_ERROR("Cannot initialize helper tool");
900 }
901 }
902 return StatusCode::SUCCESS;
903}

◆ itemListHandler()

void AthenaOutputStream::itemListHandler ( Gaudi::Details::PropertyBase & )
protected

Handler for ItemNames Property.

Definition at line 863 of file AthenaOutputStream.cxx.

863 {
864 // Assuming concrete SG::Folder also has an itemList property
865 IProperty *pAsIProp(nullptr);
866 if ((m_p2BWritten.retrieve()).isFailure() ||
867 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
868 (pAsIProp->setProperty(m_itemList)).isFailure()) {
869 throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
870 }
871}

◆ loadDict()

void AthenaOutputStream::loadDict ( CLID clid)
private

Helper function to load dictionaries (both transient and persistent) for a given type.

We want to to this explicitly during initialization to avoid sporadic failures seen loading dictionaries while multiple threads are running. See ATEAM-697 and ATEAM-749.

Definition at line 928 of file AthenaOutputStream.cxx.

929{
930 m_dictLoader->load_type (clid);
931
932 // Also load the persistent class dictionary, if applicable.
933 std::unique_ptr<ITPCnvBase> tpcnv = m_tpCnvSvc->t2p_cnv_unique (clid);
934 if (tpcnv) {
935 m_dictLoader->load_type (tpcnv->persistentTInfo());
936 }
937}

◆ selectedObjects()

IDataSelector * AthenaOutputStream::selectedObjects ( )
inline

Return the list of selected objects.

Definition at line 85 of file AthenaOutputStream.h.

85 {
86 return &m_objects;
87 }

◆ simpleMatch()

bool AthenaOutputStream::simpleMatch ( const std::string & pattern,
const std::string & text )
private

Glob-style matcher, where the only meta-character is '*'.

Definition at line 940 of file AthenaOutputStream.cxx.

941 {
942 size_t pi = 0, ti = 0, star = std::string::npos, match = 0;
943 while (ti < text.size()) {
944 if (pi < pattern.size() && (pattern[pi] == text[ti] || pattern[pi] == '*')) {
945 if (pattern[pi] == '*') { star = pi++; match = ti; }
946 else { ++pi; ++ti; }
947 } else if (star != std::string::npos) {
948 pi = star + 1;
949 ti = ++match;
950 } else return false;
951 }
952 while (pi < pattern.size() && pattern[pi] == '*') ++pi;
953 return pi == pattern.size();
954}
#define pi

◆ write()

StatusCode AthenaOutputStream::write ( )
virtual

Stream the data.

Definition at line 442 of file AthenaOutputStream.cxx.

442 {
443 bool failed = false;
444 IAthenaOutputStreamTool* streamer = &*m_streamer;
445 std::string outputFN;
446
447 std::unique_lock<mutex_t> lock(m_mutex);
448 outputFN = m_outSeqSvc->buildSequenceFileName( m_outputName );
449
450 // Handle Event Ranges
451 if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() ) {
452 ATH_MSG_DEBUG(std::format("Writing event sequence to {}", outputFN));
453 streamer = m_streamerMap[ outputFN ].get();
454 if( !streamer ) {
455 // new range, needs a new streamer tool
456 IAlgTool* st = AlgTool::Factory::create( m_streamer->type(), m_streamer->type(), m_streamer->name(), this ).release();
457 st->addRef();
458 streamer = dynamic_cast<IAthenaOutputStreamTool*>( st );
459 IProperty *mstreamer_props = dynamic_cast<IProperty*> (&*m_streamer);
460 IProperty *streamer_props = dynamic_cast<IProperty*> (&*streamer);
461 if (!mstreamer_props || !streamer_props) {
462 ATH_MSG_FATAL("Cannot cast streamer to IProperty");
463 return StatusCode::FAILURE;
464 }
465 for ( const auto& prop : mstreamer_props->getProperties() ) {
466 ATH_CHECK( streamer_props->setProperty( *prop ) );
467 }
468 if( !streamer or streamer->initialize().isFailure()
469 or streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord).isFailure() ) {
470 ATH_MSG_FATAL(std::format("Unable to initialize OutputStreamTool for {}", outputFN));
471 return StatusCode::FAILURE;
472 }
473 m_streamerMap[ outputFN ].reset( streamer );
474 }
475 }
476
477 // Clear any previously existing item list
478 // and collect all objects that are asked to be written out
481
482 // keep a local copy of the object lists so they are not overwritten when we release the lock
483 IDataSelector objects = std::move( m_objects );
484 IDataSelector altObjects = std::move( m_altObjects );
485 std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move( m_ownedObjects );
486
487 // prepare before releasing lock because m_outputAttributes change in metadataStop
488 const std::string connectStr = outputFN + m_outputAttributes;
489
490 for (auto& tool : m_helperTools) {
491 ATH_CHECK( tool->preStream() );
492 }
493
494 // MN: would be nice to release the Stream lock here
495 // lock.unlock();
496
497 // Connect the output file to the service
498 if (!streamer->connectOutput(connectStr).isSuccess()) {
499 ATH_MSG_FATAL("Could not connectOutput");
500 return StatusCode::FAILURE;
501 }
502 ATH_MSG_DEBUG(std::format("connectOutput done for {}", outputFN));
503 StatusCode currentStatus = streamer->streamObjects(objects, connectStr);
504 // Do final check of streaming
505 if (!currentStatus.isSuccess()) {
506 if (!currentStatus.isRecoverable()) {
507 ATH_MSG_FATAL("streamObjects failed.");
508 failed = true;
509 } else {
510 ATH_MSG_DEBUG("streamObjects failed.");
511 }
512 }
513 bool doCommit = false;
514 if (!streamer->commitOutput(doCommit).isSuccess()) {
515 ATH_MSG_FATAL("commitOutput failed.");
516 failed = true;
517 }
518 if (failed) {
519 return(StatusCode::FAILURE);
520 }
521 m_events++;
522 return(StatusCode::SUCCESS);
523}
void clearSelection()
Clear list of selected objects.
StatusCode collectAllObjects()
Collect data objects for output streamer list.
std::string m_outputAttributes
Output attributes.
virtual StatusCode connectServices(const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord=false)=0
Specify which data store and conversion service to use and whether to extend provenence Only use if o...
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="")=0
virtual StatusCode connectOutput(const std::string &outputName="")=0
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
virtual StatusCode commitOutput(bool doCommit=false)=0
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ writeMetaData()

void AthenaOutputStream::writeMetaData ( const std::string & outputFN = "")
private

Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)

Definition at line 318 of file AthenaOutputStream.cxx.

319{
320 // use main stream tool by default, or per outputFile in ES mode
321 IAthenaOutputStreamTool* streamer = outputFN.empty()? &*m_streamer : m_streamerMap[outputFN].get();
322
323 for (auto& tool : m_helperTools) {
324 if (!tool->preFinalize().isSuccess()) {
325 throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE);
326 }
327 }
328 if( m_metaDataSvc->prepareOutput(outputFN).isFailure() ) {
329 throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
330 }
331 // lock all metadata to prevent updates during writing
332 MetaDataSvc::ToolLockGuard tool_guard( *m_metaDataSvc );
333
334 // Prepare the WriteDataHeaderForms incident
335 std::string DHFWriteIncidentfileName = m_outSeqSvc->buildSequenceFileName(m_outputName);
336 // remove technology from the name
337 size_t pos = DHFWriteIncidentfileName.find(':');
338 if( pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(pos+1);
339 FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName);
340 m_incidentSvc->fireIncident(incident);
341
342 ATH_MSG_DEBUG("metadataItemList: " << m_metadataItemList.value() );
343 if (!m_metadataItemList.value().empty()) {
345 StatusCode status = streamer->connectServices(m_metadataStore.typeAndName(), m_persName, false);
346 if (status.isFailure()) {
347 throw GaudiException("Unable to connect metadata services", name(), StatusCode::FAILURE);
348 }
349 m_outputAttributes = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
350 m_p2BWritten->clear();
351 IProperty *pAsIProp(nullptr);
352 if ((m_p2BWritten.retrieve()).isFailure() ||
353 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
354 (pAsIProp->setProperty("ItemList", m_metadataItemList.toString())).isFailure()) {
355 throw GaudiException("Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
356 }
357 if (write().isFailure()) {
358 throw GaudiException("Cannot write metadata", name(), StatusCode::FAILURE);
359 }
360 FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName + m_outputAttributes);
361 m_incidentSvc->fireIncident(incident);
362
363 m_outputAttributes.clear();
366 if (status.isFailure()) {
367 throw GaudiException("Unable to re-connect services", name(), StatusCode::FAILURE);
368 }
369 m_p2BWritten->clear();
370 if ((pAsIProp->setProperty(m_itemList)).isFailure()) {
371 throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
372 }
373 ATH_MSG_DEBUG(std::format("Metadata items written: {}", m_metadataItemList.value().size()));
374 }
375}
status
Definition merge.py:16

Member Data Documentation

◆ m_altObjects

IDataSelector AthenaOutputStream::m_altObjects
protected

Objects overridden by ‘exact’ handling.

Definition at line 175 of file AthenaOutputStream.h.

◆ m_CLIDKeyPairs

std::multimap<CLID,std::string> AthenaOutputStream::m_CLIDKeyPairs
protected

Map of (clid,key) pairs to be excluded (comes from m_excludeList)

Definition at line 169 of file AthenaOutputStream.h.

◆ m_compInfoKey

SG::WriteHandleKey<SG::CompressionInfo> AthenaOutputStream::m_compInfoKey { this, "CompInfoKey", "" }
private

Key used for recording lossy float compressed variable information to the event store.

Definition at line 227 of file AthenaOutputStream.h.

228{ this, "CompInfoKey", "" };

◆ m_compressionBitsHigh

UnsignedIntegerProperty AthenaOutputStream::m_compressionBitsHigh {this, "CompressionBitsHigh", 7, "Lossy float compression bits (high)"}
protected

Number of mantissa bits in the float compression.

Definition at line 131 of file AthenaOutputStream.h.

131{this, "CompressionBitsHigh", 7, "Lossy float compression bits (high)"};

◆ m_compressionBitsLow

UnsignedIntegerProperty AthenaOutputStream::m_compressionBitsLow {this, "CompressionBitsLow", 15, "Lossy float compression bits (low)"}
protected

Number of mantissa bits in the float compression.

Definition at line 134 of file AthenaOutputStream.h.

134{this, "CompressionBitsLow", 15, "Lossy float compression bits (low)"};

◆ m_compressionDecoderHigh

ToolHandle<SG::IFolder> AthenaOutputStream::m_compressionDecoderHigh
protected

The top-level folder with items to be compressed high.

Definition at line 160 of file AthenaOutputStream.h.

◆ m_compressionDecoderLow

ToolHandle<SG::IFolder> AthenaOutputStream::m_compressionDecoderLow
protected

The top-level folder with items to be compressed low.

Definition at line 163 of file AthenaOutputStream.h.

◆ m_compressionListHigh

StringArrayProperty AthenaOutputStream::m_compressionListHigh {this, "CompressionListHigh", {}, "Lossy float compression list (high)"}
protected

Vector of item names.

Definition at line 125 of file AthenaOutputStream.h.

125{this, "CompressionListHigh", {}, "Lossy float compression list (high)"};

◆ m_compressionListLow

StringArrayProperty AthenaOutputStream::m_compressionListLow {this, "CompressionListLow", {}, "Lossy float compression list (low)"}
protected

Vector of item names.

Definition at line 128 of file AthenaOutputStream.h.

128{this, "CompressionListLow", {}, "Lossy float compression list (low)"};

◆ m_currentStore

ServiceHandle<StoreGateSvc>* AthenaOutputStream::m_currentStore
protected

Definition at line 101 of file AthenaOutputStream.h.

◆ m_dataStore

ServiceHandle<StoreGateSvc> AthenaOutputStream::m_dataStore {this, "Store", "StoreGateSvc/StoreGateSvc", "Handle to event store"}
protected

Handle to the StoreGateSvc store where the data we want to write out resides.

Definition at line 99 of file AthenaOutputStream.h.

99{this, "Store", "StoreGateSvc/StoreGateSvc", "Handle to event store"};

◆ m_dictLoader

ServiceHandle<IDictLoaderSvc> AthenaOutputStream::m_dictLoader {this, "AthDictLoaderSvc", "AthDictLoaderSvc"}
protected

Definition at line 105 of file AthenaOutputStream.h.

105{this, "AthDictLoaderSvc", "AthDictLoaderSvc"};

◆ m_events

std::atomic<int> AthenaOutputStream::m_events {0}
protected

Number of events written to this output stream.

Definition at line 192 of file AthenaOutputStream.h.

192{0};

◆ m_extendProvenanceRecord

BooleanProperty AthenaOutputStream::m_extendProvenanceRecord {this, "ExtendProvenanceRecord", true, "Extend provenance record"}
protected

Set to false to omit adding the current DataHeader into the DataHeader history This will cause the input file to be neglected for back navigation (replace mode).

Definition at line 151 of file AthenaOutputStream.h.

151{this, "ExtendProvenanceRecord", true, "Extend provenance record"};

◆ m_forceRead

BooleanProperty AthenaOutputStream::m_forceRead {this, "ForceRead", true, "Force read data objects in ItemList"}
protected

set to true to force read of data objects in item list

Definition at line 147 of file AthenaOutputStream.h.

147{this, "ForceRead", true, "Force read data objects in ItemList"};

◆ m_helperTools

ToolHandleArray<IAthenaOutputTool> AthenaOutputStream::m_helperTools {this, "HelperTools", {}, "List of AlgTools used by this stream"}
protected

vector of AlgTools that that are executed by this stream

Definition at line 186 of file AthenaOutputStream.h.

186{this, "HelperTools", {}, "List of AlgTools used by this stream"};

◆ m_incidentSvc

ServiceHandle<IIncidentSvc> AthenaOutputStream::m_incidentSvc {this, "IncidentSvc", "IncidentSvc"}
protected

Definition at line 107 of file AthenaOutputStream.h.

107{this, "IncidentSvc", "IncidentSvc"};

◆ m_itemList

StringArrayProperty AthenaOutputStream::m_itemList {this, "ItemList", {}, "List of items to write", "OutputStreamItemList"}
protected

Vector of item names.

Definition at line 115 of file AthenaOutputStream.h.

115{this, "ItemList", {}, "List of items to write", "OutputStreamItemList"};

◆ m_itemListFromTool

BooleanProperty AthenaOutputStream::m_itemListFromTool {this, "TakeItemsFromInput", false, "Write everything in input DataHeader to output"}
protected

Set to write out everything in input DataHeader.

Definition at line 154 of file AthenaOutputStream.h.

154{this, "TakeItemsFromInput", false, "Write everything in input DataHeader to output"};

◆ m_keepProvenances

StringProperty AthenaOutputStream::m_keepProvenances
protected
Initial value:
{this, "KeepProvenanceTagsRegEx", {".*"},
"RegEx pattern to select processing tags for which DataHeader should retain provenances"}

Provenance record selection.

Definition at line 121 of file AthenaOutputStream.h.

121 {this, "KeepProvenanceTagsRegEx", {".*"},
122 "RegEx pattern to select processing tags for which DataHeader should retain provenances"};

◆ m_metadataItemList

StringArrayProperty AthenaOutputStream::m_metadataItemList {this, "MetadataItemList", {}, "List of metadata items to write","OutputStreamItemList"}
protected

Vector of item names.

Definition at line 118 of file AthenaOutputStream.h.

118{this, "MetadataItemList", {}, "List of metadata items to write","OutputStreamItemList"};

◆ m_metadataStore

ServiceHandle<StoreGateSvc> AthenaOutputStream::m_metadataStore {this, "MetadataStore", "StoreGateSvc/MetaDataStore", "Handle to metadata store"}
protected

Definition at line 100 of file AthenaOutputStream.h.

100{this, "MetadataStore", "StoreGateSvc/MetaDataStore", "Handle to metadata store"};

◆ m_metaDataSvc

ServiceHandle<MetaDataSvc> AthenaOutputStream::m_metaDataSvc {this, "MetaDataSvc", "MetaDataSvc"}
protected

Handles to all the necessary services.

Definition at line 104 of file AthenaOutputStream.h.

104{this, "MetaDataSvc", "MetaDataSvc"};

◆ m_mutex

mutex_t AthenaOutputStream::m_mutex
protected

mutex for this Stream write() and handle() methods

Definition at line 205 of file AthenaOutputStream.h.

◆ m_objects

IDataSelector AthenaOutputStream::m_objects
protected

Collection of objects being selected.

Definition at line 172 of file AthenaOutputStream.h.

◆ m_outputAttributes

std::string AthenaOutputStream::m_outputAttributes
private

Output attributes.

Definition at line 231 of file AthenaOutputStream.h.

◆ m_outputName

StringProperty AthenaOutputStream::m_outputName {this, "OutputFile", "DidNotNameOutput.root", "Name of the output file"}
protected

Name of the output file.

Definition at line 141 of file AthenaOutputStream.h.

141{this, "OutputFile", "DidNotNameOutput.root", "Name of the output file"};

◆ m_outSeqSvc

ServiceHandle<OutputStreamSequencerSvc> AthenaOutputStream::m_outSeqSvc {this, "OutputStreamSequencerSvc", "OutputStreamSequencerSvc"}
protected

Definition at line 109 of file AthenaOutputStream.h.

109{this, "OutputStreamSequencerSvc", "OutputStreamSequencerSvc"};

◆ m_ownedObjects

std::vector<std::unique_ptr<DataObject> > AthenaOutputStream::m_ownedObjects
protected

Collection of DataObject instances owned by this service.

FIXME: it would be simpler to just have m_objects be a vector of DataObjectSharedPtr<DataObject>, but that implies interface changes.

Definition at line 180 of file AthenaOutputStream.h.

◆ m_p2BWritten

ToolHandle<SG::IFolder> AthenaOutputStream::m_p2BWritten
protected

The top-level folder with items to be written.

Definition at line 157 of file AthenaOutputStream.h.

◆ m_pCLIDSvc

ServiceHandle<IClassIDSvc> AthenaOutputStream::m_pCLIDSvc {this, "ClassIDSvc", "ClassIDSvc"}
protected

Definition at line 108 of file AthenaOutputStream.h.

108{this, "ClassIDSvc", "ClassIDSvc"};

◆ m_persName

StringProperty AthenaOutputStream::m_persName {this, "EvtConversionSvc", "EventPersistencySvc", "Name of the persistency service writing data"}
protected

Name of the persistency service capable to write data from the store.

Definition at line 144 of file AthenaOutputStream.h.

144{this, "EvtConversionSvc", "EventPersistencySvc", "Name of the persistency service writing data"};

◆ m_rangeIDforRangeFN

std::map< std::string, std::string > AthenaOutputStream::m_rangeIDforRangeFN
protected

map of RangeIDs (as used by the Sequencer) for each Range filename generated

Definition at line 199 of file AthenaOutputStream.h.

◆ m_selVetoesKey

SG::WriteHandleKey<SG::SelectionVetoes> AthenaOutputStream::m_selVetoesKey { this, "SelVetoesKey", "" }
private

Key used for recording selected dynamic variable information to the event store.

Definition at line 222 of file AthenaOutputStream.h.

223{ this, "SelVetoesKey", "" };

◆ m_slotRangeMap

std::map< unsigned, std::string > AthenaOutputStream::m_slotRangeMap
protected

map of filenames assigned to active slots

Definition at line 196 of file AthenaOutputStream.h.

◆ m_streamer

ToolHandle<IAthenaOutputStreamTool> AthenaOutputStream::m_streamer
protected

pointer to AthenaOutputStreamTool

Definition at line 183 of file AthenaOutputStream.h.

◆ m_streamerMap

std::map< std::string, std::unique_ptr<IAthenaOutputStreamTool> > AthenaOutputStream::m_streamerMap
protected

map of streamerTools handling event ranges in MT

Definition at line 202 of file AthenaOutputStream.h.

◆ m_streamName

StringProperty AthenaOutputStream::m_streamName {this, "StreamName", "", "Name of the output stream"}
protected

Stream name (defaults to algorithm name)

Definition at line 112 of file AthenaOutputStream.h.

112{this, "StreamName", "", "Name of the output stream"};

◆ m_tpCnvSvc

ServiceHandle<ITPCnvSvc> AthenaOutputStream::m_tpCnvSvc {this, "AthTPCnvSvc", "AthTPCnvSvc"}
protected

Definition at line 106 of file AthenaOutputStream.h.

106{this, "AthTPCnvSvc", "AthTPCnvSvc"};

◆ m_transient

ToolHandle<SG::IFolder> AthenaOutputStream::m_transient
protected

Decoded list of transient ids.

Definition at line 166 of file AthenaOutputStream.h.

◆ m_transientItems

StringArrayProperty AthenaOutputStream::m_transientItems {this, "TransientItems", {}, "Transient item list"}
protected

List of items that are known to be present in the transient store (and hence we can make input dependencies on them).

Definition at line 138 of file AthenaOutputStream.h.

138{this, "TransientItems", {}, "Transient item list"};

◆ m_writeMetadataAndDisconnect

bool AthenaOutputStream::m_writeMetadataAndDisconnect {false}
protected

Definition at line 189 of file AthenaOutputStream.h.

189{false};

The documentation for this class was generated from the following files: