ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaOutputStream Class Reference

algorithm that marks for write data objects in SG More...

#include <AthenaOutputStream.h>

Inheritance diagram for AthenaOutputStream:

Public Types

typedef std::vector< SG::DataProxy * > Items
 typedefs
typedef std::vector< std::pair< std::string, std::string > > TypeKeyPairs
typedef std::recursive_mutex mutex_t

Public Member Functions

 AthenaOutputStream (const std::string &name, ISvcLocator *pSvcLocator)
 Standard algorithm Constructor.
virtual ~AthenaOutputStream ()
 Standard Destructor.

implement IAlgorithm

ServiceHandle< StoreGateSvcm_dataStore {this, "Store", "StoreGateSvc/StoreGateSvc", "Handle to event store"}
 Handle to the StoreGateSvc store where the data we want to write out resides.
ServiceHandle< StoreGateSvcm_metadataStore {this, "MetadataStore", "StoreGateSvc/MetaDataStore", "Handle to metadata store"}
ServiceHandle< StoreGateSvc > * m_currentStore
ServiceHandle< MetaDataSvcm_metaDataSvc {this, "MetaDataSvc", "MetaDataSvc"}
 Handles to all the necessary services.
ServiceHandle< IDictLoaderSvcm_dictLoader {this, "AthDictLoaderSvc", "AthDictLoaderSvc"}
ServiceHandle< ITPCnvSvcm_tpCnvSvc {this, "AthTPCnvSvc", "AthTPCnvSvc"}
ServiceHandle< IIncidentSvc > m_incidentSvc {this, "IncidentSvc", "IncidentSvc"}
ServiceHandle< IClassIDSvc > m_pCLIDSvc {this, "ClassIDSvc", "ClassIDSvc"}
ServiceHandle< OutputStreamSequencerSvcm_outSeqSvc {this, "OutputStreamSequencerSvc", "OutputStreamSequencerSvc"}
StringProperty m_streamName {this, "StreamName", "", "Name of the output stream"}
 Stream name (defaults to algorithm name).
StringArrayProperty m_itemList {this, "ItemList", {}, "List of items to write", "OutputStreamItemList"}
 Vector of item names.
StringArrayProperty m_metadataItemList {this, "MetadataItemList", {}, "List of metadata items to write","OutputStreamItemList"}
 Vector of item names.
StringProperty m_keepProvenances
 Provenance record selection.
StringArrayProperty m_compressionListHigh {this, "CompressionListHigh", {}, "Lossy float compression list (high)"}
 Vector of item names.
StringArrayProperty m_compressionListLow {this, "CompressionListLow", {}, "Lossy float compression list (low)"}
 Vector of item names.
UnsignedIntegerProperty m_compressionBitsHigh {this, "CompressionBitsHigh", 7, "Lossy float compression bits (high)"}
 Number of mantissa bits in the float compression.
UnsignedIntegerProperty m_compressionBitsLow {this, "CompressionBitsLow", 15, "Lossy float compression bits (low)"}
 Number of mantissa bits in the float compression.
StringArrayProperty m_transientItems {this, "TransientItems", {}, "Transient item list"}
 List of items that are known to be present in the transient store (and hence we can make input dependencies on them).
StringProperty m_outputName {this, "OutputFile", "DidNotNameOutput.root", "Name of the output file"}
 Name of the output file.
StringProperty m_persName {this, "EvtConversionSvc", "EventPersistencySvc", "Name of the persistency service writing data"}
 Name of the persistency service capable to write data from the store.
BooleanProperty m_forceRead {this, "ForceRead", true, "Force read data objects in ItemList"}
 set to true to force read of data objects in item list
BooleanProperty m_extendProvenanceRecord {this, "ExtendProvenanceRecord", true, "Extend provenance record"}
 Set to false to omit adding the current DataHeader into the DataHeader history This will cause the input file to be neglected for back navigation (replace mode).
BooleanProperty m_itemListFromTool {this, "TakeItemsFromInput", false, "Write everything in input DataHeader to output"}
 Set to write out everything in input DataHeader.
ToolHandle< SG::IFolderm_p2BWritten
 The top-level folder with items to be written.
ToolHandle< SG::IFolderm_compressionDecoderHigh
 The top-level folder with items to be compressed high.
ToolHandle< SG::IFolderm_compressionDecoderLow
 The top-level folder with items to be compressed low.
ToolHandle< SG::IFolderm_transient
 Decoded list of transient ids.
std::multimap< CLID, std::string > m_CLIDKeyPairs
 Map of (clid,key) pairs to be excluded (comes from m_excludeList).
IDataSelector m_objects
 Collection of objects being selected.
IDataSelector m_altObjects
 Objects overridden by `exact' handling.
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
 Collection of DataObject instances owned by this service.
ToolHandle< IAthenaOutputStreamToolm_streamer
 pointer to AthenaOutputStreamTool
ToolHandleArray< IAthenaOutputToolm_helperTools {this, "HelperTools", {}, "List of AlgTools used by this stream"}
 vector of AlgTools that that are executed by this stream
bool m_writeMetadataAndDisconnect {false}
std::atomic< int > m_events {0}
 Number of events written to this output stream.
std::map< unsigned, std::string > m_slotRangeMap
 map of filenames assigned to active slots
std::map< std::string, std::string > m_rangeIDforRangeFN
 map of RangeIDs (as used by the Sequencer) for each Range filename generated
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
 map of streamerTools handling event ranges in MT
mutex_t m_mutex
 mutex for this Stream write() and handle() methods
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey { this, "SelVetoesKey", "" }
 Key used for recording selected dynamic variable information to the event store.
SG::WriteHandleKey< SG::CompressionInfom_compInfoKey { this, "CompInfoKey", "" }
 Key used for recording lossy float compressed variable information to the event store.
std::string m_outputAttributes
 Output attributes.
virtual StatusCode initialize () override
virtual StatusCode finalize () override
virtual StatusCode execute () override
virtual StatusCode write (const EventContext &ctx)
 Stream the data.
void clearSelection ()
 Clear list of selected objects.
StatusCode collectAllObjects (const EventContext &ctx)
 Collect data objects for output streamer list.
IDataSelector * selectedObjects ()
 Return the list of selected objects.
virtual void handle (const Incident &incident) override
 Incident service handle listening for MetaDataStop.
virtual StatusCode io_reinit () override
 Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2)).
virtual StatusCode io_finalize () override
void itemListHandler (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property.
void excludeListHandler (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property.
void compressionListHandlerHigh (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property.
void compressionListHandlerLow (Gaudi::Details::PropertyBase &)
 Handler for ItemNames Property.
StatusCode addItemObjects (const EventContext &, const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
 Add item data objects to output streamer list.
void handleVariableSelection (const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
 Here we build the vetoed AuxIDs.
void writeMetaData (const EventContext &ctx, const std::string &outputFN="")
 Write MetaData for this stream (by default) or for a substream outputFN (in ES mode).
std::set< std::string > buildCompressionSet (const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
 Helper function for building the compression lists.
void finalizeRange (const EventContext &ctx, const std::string &rangeFN)
void loadDict (CLID clid)
 Helper function to load dictionaries (both transient and persistent) for a given type.
bool simpleMatch (const std::string &pattern, const std::string &text)
 Glob-style matcher, where the only meta-character is '*'.

Detailed Description

algorithm that marks for write data objects in SG

Author
srini.nosp@m.r@bn.nosp@m.l.gov

Definition at line 53 of file AthenaOutputStream.h.

Member Typedef Documentation

◆ Items

typedefs

Definition at line 58 of file AthenaOutputStream.h.

◆ mutex_t

typedef std::recursive_mutex AthenaOutputStream::mutex_t

Definition at line 60 of file AthenaOutputStream.h.

◆ TypeKeyPairs

typedef std::vector<std::pair<std::string, std::string> > AthenaOutputStream::TypeKeyPairs

Definition at line 59 of file AthenaOutputStream.h.

Constructor & Destructor Documentation

◆ AthenaOutputStream()

AthenaOutputStream::AthenaOutputStream ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard algorithm Constructor.

Definition at line 43 of file AthenaOutputStream.cxx.

44 : base_class(name, pSvcLocator),
46 m_p2BWritten(std::format("SG::Folder/{}_TopFolder", name), this),
47 m_compressionDecoderHigh(std::format("SG::Folder/{}_compressed_high", name), this),
48 m_compressionDecoderLow(std::format("SG::Folder/{}_compressed_low", name), this),
49 m_transient(std::format("SG::Folder/{}_transient", name), this),
50 m_streamer(std::format("AthenaOutputStreamTool/{}Tool", name), this)
51{
52 // Ensure the service locater is good
53 assert(pSvcLocator);
54
55 // This property depends on the name that's known at construction time
56 // Therefore, do it the old fashioned way
57 declareProperty("WritingTool", m_streamer);
58
59 // Associate action handlers with the AcceptAlgs,
60 // RequireAlgs & VetoAlgs properties
61 m_itemList.declareUpdateHandler(&AthenaOutputStream::itemListHandler, this);
64}
ToolHandle< SG::IFolder > m_compressionDecoderLow
The top-level folder with items to be compressed low.
ToolHandle< SG::IFolder > m_p2BWritten
The top-level folder with items to be written.
StringArrayProperty m_compressionListLow
Vector of item names.
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
ToolHandle< SG::IFolder > m_compressionDecoderHigh
The top-level folder with items to be compressed high.
StringArrayProperty m_itemList
Vector of item names.
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
StringArrayProperty m_compressionListHigh
Vector of item names.
ServiceHandle< StoreGateSvc > * m_currentStore
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
ServiceHandle< StoreGateSvc > m_dataStore
Handle to the StoreGateSvc store where the data we want to write out resides.
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.

◆ ~AthenaOutputStream()

AthenaOutputStream::~AthenaOutputStream ( )
virtual

Standard Destructor.

Definition at line 67 of file AthenaOutputStream.cxx.

67 {
68 // Clear the internal caches
69 m_streamerMap.clear();
70}
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT

Member Function Documentation

◆ addItemObjects()

StatusCode AthenaOutputStream::addItemObjects ( const EventContext & ctx,
const SG::FolderItem & item,
SG::SelectionVetoes & vetoes,
SG::CompressionInfo & compInfo )
private

Add item data objects to output streamer list.

Handle variable selections. Both variable selection and lossy float compression are limited to event data for the time being

Definition at line 568 of file AthenaOutputStream.cxx.

572{
573 // anything after a dot is a list of dynamic Aux attributes, separated by dots
574 size_t dotpos = item.key().find('.');
575 std::string item_key, aux_attr;
576 if( dotpos != std::string::npos ) {
577 item_key = item.key().substr(0, dotpos+1);
578 aux_attr = item.key().substr(dotpos+1);
579 } else {
580 item_key = item.key();
581 }
582 CLID item_id = item.id();
583 ATH_MSG_DEBUG(std::format("addItemObjects({},\"{}\") called", item_id, item_key));
584 ATH_MSG_DEBUG(std::format(" Key:{}", item_key));
585 if( aux_attr.size() ) {
586 ATH_MSG_DEBUG(std::format(" Aux Attr:{}", aux_attr));
587 }
588
589 // Here we build the list of attributes for the lossy float compression
590 // Note that we do not allow m_compressionBitsHigh >= m_compressionBitsLow
591 // Otherwise is, in any case, a logical error and they'd potentially overwrite each other
592 std::map< unsigned int, std::set< std::string > > comp_attr_map;
593 comp_attr_map[ m_compressionBitsHigh ] = buildCompressionSet( m_compressionDecoderHigh, item_id, item_key );
594 comp_attr_map[ m_compressionBitsLow ] = buildCompressionSet( m_compressionDecoderLow, item_id, item_key );
595
596 // Print some debugging information regarding the lossy float compression configuration
597 for( const auto& it : comp_attr_map ) {
598 ATH_MSG_DEBUG(std::format(" Comp Attr {} with {} mantissa bits.", it.second.size(), it.first));
599 if ( it.second.size() > 0 ) {
600 for( const auto& attr : it.second ) {
601 ATH_MSG_DEBUG(std::format(" >> {}", attr));
602 }
603 }
604 }
605
606 // For MetaData objects of type T that are kept in MetaContainers get the MetaCont<T> ID
607 const CLID remapped_item_id = m_metaDataSvc->remapMetaContCLID( item_id );
609 SG::ProxyMap map;
610 bool gotProxies = false;
611 // Look for the clid in storegate
612 SG::DataProxy* match = (*m_currentStore)->proxy(remapped_item_id, item_key, true);
613 if (match != nullptr) {
614 map.insert({item_key, match});
615 iter = map.begin();
616 end = map.end();
617 gotProxies = true;
618 }
619 // Look for the clid in storegate
620 if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter, end)).isSuccess()) {
621 gotProxies = true;
622 }
623 if (gotProxies) {
624 bool added = false, removed = false;
625 // Now loop over any found proxies
626 for (; iter != end; ++iter) {
627 SG::DataProxy* itemProxy(iter->second);
628 std::string proxyName = itemProxy->name();
629 std::string stream;
631 // only check metadata keys
632 stream = m_metaDataSvc->removeStreamFromKey(proxyName); // can modify proxyName
633 }
634 // Does this key match the proxy key name - allow for wildcarding and aliases
635 bool keyMatch = ( item_key == "*" ||
636 item_key == proxyName ||
637 itemProxy->hasAlias(item_key) );
638 if (!keyMatch) {
639 // For item list we currently allow wildcards ('*'), which has limited use, e.g.:
640 // xAOD::CutBookkeeperAuxContainer#IncompleteCutBookkeepers*Aux.
641 // Here we look for those few cases...
642 keyMatch = simpleMatch(item_key, proxyName);
643 ATH_MSG_DEBUG(std::format("Result of checking {} against {} to see if it matches is {}",
644 proxyName, item_key, keyMatch));
645 }
646
647 // Now check if this item is marked for another output stream, if so we reject it
648 // We also reject keys that are marked transient at this point
649 bool xkeyMatch = false;
650 if( (!stream.empty() and stream != m_outputName) || SG::isTransientKey(proxyName) ) {
651 // reject keys that are marked for a different output stream
652 ATH_MSG_DEBUG(std::format("Rejecting key: {} in output: {}", itemProxy->name(), m_outputName.toString()));
653 xkeyMatch = true;
654 }
655
656 // All right, it passes key match find in itemList, but not in excludeList
657 if (keyMatch && !xkeyMatch) {
658 if (m_forceRead && itemProxy->isValid()) {
659 if (nullptr == itemProxy->accessData()) {
660 ATH_MSG_ERROR(std::format(" Could not get data object for id {},\"{}\"", remapped_item_id, proxyName));
661 }
662 }
663 if (nullptr != itemProxy->object()) {
664 if( std::find(m_objects.begin(), m_objects.end(), itemProxy->object()) == m_objects.end() &&
665 std::find(m_altObjects.begin(), m_altObjects.end(), itemProxy->object()) == m_altObjects.end() )
666 {
667 if( item_id != remapped_item_id ) {
668 // For MetaCont<T>: -
669 // create a temporary DataObject for an entry in the container to pass to CnvSvc
670 DataBucketBase* dbb = static_cast<DataBucketBase*>( itemProxy->object() );
671 const MetaContBase* metaCont = static_cast<MetaContBase*>( dbb->cast( ClassID_traits<MetaContBase>::ID() ) );
672 void* obj = metaCont? metaCont->getAsVoid( m_outSeqSvc->currentRangeID(ctx) ) : nullptr;
673 if( obj ) {
674 auto altbucket = std::make_unique<AltDataBucket>(
675 obj, item_id, *CLIDRegistry::CLIDToTypeinfo(item_id), proxyName );
676 m_objects.push_back( altbucket.get() );
677 m_ownedObjects.push_back( std::move(altbucket) );
678 m_altObjects.push_back( itemProxy->object() ); // only for duplicate prevention
679 } else {
680 ATH_MSG_ERROR(std::format("Failed to retrieve object from MetaCont with key={}, for EventRangeID={}",
681 item_key, m_outSeqSvc->currentRangeID(ctx)));
682 return StatusCode::FAILURE;
683 }
684 } else if (item.exact()) {
685 // If the exact flag is set, make a new DataObject
686 // holding the object as the requested type.
687 DataBucketBase* dbb = dynamic_cast<DataBucketBase*> (itemProxy->object());
688 if (!dbb) std::abort();
689 void* ptr = dbb->cast (item_id);
690 if (!ptr) {
691 // Hard cast
692 ptr = dbb->object();
693 }
694 auto altbucket =
695 std::make_unique<AltDataBucket>
696 (ptr, item_id,
698 *itemProxy);
699 m_objects.push_back(altbucket.get());
700 m_ownedObjects.push_back (std::move(altbucket));
701 m_altObjects.push_back (itemProxy->object());
702 }
703 else
704 m_objects.push_back(itemProxy->object());
705 ATH_MSG_DEBUG(std::format(" Added object {},\"{}\"", item_id, proxyName));
706 }
707
711 if ((*m_currentStore)->storeID() == StoreID::EVENT_STORE &&
712 item_key.find( RootAuxDynIO::AUX_POSTFIX ) == ( item_key.size() - 4 )) {
713
714 const SG::IConstAuxStore* auxstore( nullptr );
715 try {
716 SG::fromStorable( itemProxy->object(), auxstore, true );
717 } catch( const std::exception& ) {
718 ATH_MSG_DEBUG(std::format("Error in casting object with CLID {} to SG::IConstAuxStore*", itemProxy->clID()));
719 auxstore = nullptr;
720 }
721
722 if (auxstore) {
723 handleVariableSelection (*auxstore, *itemProxy,
724 aux_attr, vetoes);
725
726 // Here comes the compression logic using ThinningInfo
727 // Get a hold of all AuxIDs for this store (static, dynamic etc.)
728 const SG::auxid_set_t allVars = auxstore->getAuxIDs();
729
730 // Get a handle on the compression information for this store
731 std::string key = item_key;
732 key.erase (key.size()-4, 4);
733
734 // Build the compression list, retrieve the relevant AuxIDs and
735 // store it in the relevant map that is going to be inserted into
736 // the ThinningCache later on by the ThinningCacheTool
737 xAOD::AuxCompression compression;
738 compression.setCompressedAuxIDs( comp_attr_map );
739 for( const auto& it : compression.getCompressedAuxIDs( allVars ) ) {
740 if( it.second.size() > 0 ) { // insert only if the set is non-empty
741 compInfo[ key ][ it.first ] = it.second;
742 ATH_MSG_DEBUG(std::format("Container {} has {} variables that'll be "
743 "lossy float compressed with {} mantissa bits",
744 key, it.second.size(), it.first));
745 }
746 } // End of loop over variables to be lossy float compressed
747 } // End of lossy float compression logic
748
749 }
750
751 added = true;
752 }
753 } else if (keyMatch && xkeyMatch) {
754 removed = true;
755 }
756 } // proxy loop
757 if (!added && !removed) {
758 ATH_MSG_DEBUG(std::format(" No object matching {},\"{}\" found", item_id, item_key));
759 } else if (removed) {
760 ATH_MSG_DEBUG(std::format(" Object being excluded based on property setting {},\"{}\". Skipping",
761 item_id, item_key));
762 }
763 } else {
764 ATH_MSG_DEBUG(std::format(" Failed to receive proxy iterators from StoreGate for {},\"{}\". Skipping",
765 item_id, item_key));
766 }
767 return StatusCode::SUCCESS;
768}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_DEBUG(x)
uint32_t CLID
The Class ID type.
IDataSelector m_objects
Collection of objects being selected.
ServiceHandle< StoreGateSvc > m_metadataStore
UnsignedIntegerProperty m_compressionBitsLow
Number of mantissa bits in the float compression.
UnsignedIntegerProperty m_compressionBitsHigh
Number of mantissa bits in the float compression.
ServiceHandle< MetaDataSvc > m_metaDataSvc
Handles to all the necessary services.
StringProperty m_outputName
Name of the output file.
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Here we build the vetoed AuxIDs.
IDataSelector m_altObjects
Objects overridden by `exact' handling.
BooleanProperty m_forceRead
set to true to force read of data objects in item list
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
bool simpleMatch(const std::string &pattern, const std::string &text)
Glob-style matcher, where the only meta-character is '*'.
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
virtual void * object()=0
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
virtual void * getAsVoid(const SourceID &sid) const =0
CLID id() const
const std::string & key() const
bool exact() const
@ EVENT_STORE
Definition StoreID.h:26
virtual std::map< unsigned int, SG::auxid_set_t > getCompressedAuxIDs(const SG::auxid_set_t &fullset) const
Return those variables that are selected to be compressed per compression setting.
virtual void setCompressedAuxIDs(const std::map< unsigned int, std::set< std::string > > &attributes)
Set which variables should be compressed per compression setting.
bool match(std::string s1, std::string s2)
match the individual directories of two strings
Definition hcg.cxx:359
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
std::map< std::string, DataProxy * > ProxyMap
Definition ProxyMap.h:22
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
ProxyMap::const_iterator ConstProxyIterator
Definition ProxyMap.h:24
void * ptr(T *p)
Definition SGImplSvc.cxx:74

◆ buildCompressionSet()

std::set< std::string > AthenaOutputStream::buildCompressionSet ( const ToolHandle< SG::IFolder > & handle,
const CLID & item_id,
const std::string & item_key ) const
private

Helper function for building the compression lists.

Here we build the list of attributes for the float compression CompressionList follows the same logic as the ItemList We find the matching keys, read the string after "Aux.", tokenize by "." and build an std::set of these to be communicated to ThinningInfo elsewhere in the code.

Definition at line 776 of file AthenaOutputStream.cxx.

779{
780 // Create an empty result
781 std::set<std::string> result;
782
783 // Check the item is indeed Aux.
784 if(item_key.find("Aux.") == std::string::npos) {
785 return result;
786 }
787
788 // First the high compression list
789 for (const auto& iter : *handle) {
790 // First match the IDs for early rejection.
791 if (iter.id() != item_id) {
792 continue;
793 }
794 // Then find the compression item key and the compression list string
795 size_t seppos = iter.key().find('.');
796 std::string comp_item_key{""}, comp_str{""};
797 if(seppos != std::string::npos) {
798 comp_item_key = iter.key().substr(0, seppos+1);
799 comp_str = iter.key().substr(seppos+1);
800 } else {
801 comp_item_key = iter.key();
802 }
803 // Proceed only if the keys match and the
804 // compression list string is not empty
805 if (!comp_str.empty() && comp_item_key == item_key) {
806 std::stringstream ss(comp_str);
807 std::string attr;
808 while( std::getline(ss, attr, '.') ) {
809 result.insert(attr);
810 }
811 }
812 }
813
814 // All done, return the result
815 return result;
816}
static Double_t ss
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.

◆ clearSelection()

void AthenaOutputStream::clearSelection ( )

Clear list of selected objects.

Definition at line 527 of file AthenaOutputStream.cxx.

527 {
528 m_objects.clear();
529 m_ownedObjects.clear();
530 m_altObjects.clear();
531}

◆ collectAllObjects()

StatusCode AthenaOutputStream::collectAllObjects ( const EventContext & ctx)

Collect data objects for output streamer list.

Definition at line 534 of file AthenaOutputStream.cxx.

534 {
535 if (m_itemListFromTool) {
536 if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
537 ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
538 }
539 }
540
541 // This holds the vetoes for the AuxID selection
542 auto vetoes = std::make_unique<SG::SelectionVetoes>();
543 // This holds the lossy float compression information
544 auto compInfo = std::make_unique<SG::CompressionInfo>();
545
546 m_p2BWritten->updateItemList(true);
547 // Collect all objects that need to be persistified:
548 for (const auto& i : *m_p2BWritten) {
549 ATH_CHECK( addItemObjects(ctx, i, *vetoes, *compInfo) );
550 }
551
552 // If there were any variable selections, record the information in SG.
553 if (!vetoes->empty()) {
554 ATH_CHECK( SG::makeHandle (m_selVetoesKey, ctx).record (std::move (vetoes)) );
555 }
556
557 // Store the lossy float compression information in the SG.
558 if (!compInfo->empty()) {
559 ATH_CHECK( SG::makeHandle (m_compInfoKey, ctx).record (std::move (compInfo)) );
560 }
561
562 return StatusCode::SUCCESS;
563}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
BooleanProperty m_itemListFromTool
Set to write out everything in input DataHeader.
StatusCode addItemObjects(const EventContext &, const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())

◆ compressionListHandlerHigh()

void AthenaOutputStream::compressionListHandlerHigh ( Gaudi::Details::PropertyBase & )
protected

Handler for ItemNames Property.

Definition at line 875 of file AthenaOutputStream.cxx.

875 {
876 IProperty *pAsIProp(nullptr);
877 if ((m_compressionDecoderHigh.retrieve()).isFailure() ||
878 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderHigh)) ||
879 (pAsIProp->setProperty("ItemList", m_compressionListHigh.toString())).isFailure()) {
880 throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
881 }
882}

◆ compressionListHandlerLow()

void AthenaOutputStream::compressionListHandlerLow ( Gaudi::Details::PropertyBase & )
protected

Handler for ItemNames Property.

Definition at line 884 of file AthenaOutputStream.cxx.

884 {
885 IProperty *pAsIProp(nullptr);
886 if ((m_compressionDecoderLow.retrieve()).isFailure() ||
887 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderLow)) ||
888 (pAsIProp->setProperty("ItemList", m_compressionListLow.toString())).isFailure()) {
889 throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
890 }
891}

◆ excludeListHandler()

void AthenaOutputStream::excludeListHandler ( Gaudi::Details::PropertyBase & )
protected

Handler for ItemNames Property.

◆ execute()

StatusCode AthenaOutputStream::execute ( )
overridevirtual

Definition at line 406 of file AthenaOutputStream.cxx.

406 {
407 const EventContext& ctx = Gaudi::Hive::currentContext();
408 bool failed = false;
409 // Call tool preExecute prior to writing
410 for (auto& tool : m_helperTools) {
411 if (!tool->preExecute().isSuccess()) {
412 failed = true;
413 }
414 }
415 // Write the event if the event is accepted
416 if (isEventAccepted()) {
417 if (write(ctx).isFailure()) {
418 failed = true;
419 }
420 }
421 // Call tool postExecute after writing
422 for (auto& tool : m_helperTools) {
423 if(!tool->postExecute().isSuccess()) {
424 failed = true;
425 }
426 }
427 // See if we should write metadata and do if so
429 writeMetaData(ctx);
431 // finalize will disconnect output
432 if( !finalize().isSuccess() ) {
433 failed = true;
434 }
435 }
436 if (failed) {
437 return(StatusCode::FAILURE);
438 }
439 return(StatusCode::SUCCESS);
440}
virtual StatusCode finalize() override
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
virtual StatusCode write(const EventContext &ctx)
Stream the data.
void writeMetaData(const EventContext &ctx, const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode).

◆ finalize()

StatusCode AthenaOutputStream::finalize ( )
overridevirtual

Definition at line 378 of file AthenaOutputStream.cxx.

379{
380 bool failed = false;
381 ATH_MSG_DEBUG("finalize: Optimize output");
382 // Connect the output file to the service
383 if (!m_streamer->finalizeOutput().isSuccess()) {
384 failed = true;
385 }
386 ATH_MSG_DEBUG("finalize: end optimize output");
387 // Release the tools
388 if (!m_helperTools.release().isSuccess()) {
389 failed = true;
390 }
391 if (!m_streamer.release().isSuccess()) {
392 failed = true;
393 }
394 if (failed) {
395 return(StatusCode::FAILURE);
396 }
397 // Clear the internal caches
398 m_objects.clear();
399 m_objects.shrink_to_fit();
400 m_ownedObjects.clear();
401 m_altObjects.clear();
402 return(StatusCode::SUCCESS);
403}

◆ finalizeRange()

void AthenaOutputStream::finalizeRange ( const EventContext & ctx,
const std::string & rangeFN )
private

Definition at line 298 of file AthenaOutputStream.cxx.

299{
300 ATH_MSG_DEBUG(std::format("Writing MetaData to {}", rangeFN));
301 // MN: not calling StopMetaData Incident here but directly writeMetaData() - OK for Sim, check others
302 // metadata tools like CutFlowSvc are not able to handle this yet
303 const std::string rememberID = m_outSeqSvc->setRangeID( ctx, m_rangeIDforRangeFN[ rangeFN ] );
304 writeMetaData( ctx, rangeFN );
305 m_outSeqSvc->setRangeID( ctx, rememberID );
306
307 ATH_MSG_INFO(std::format("Finished writing Event Sequence to {}", rangeFN));
308 auto strm_iter = m_streamerMap.find( rangeFN );
309 strm_iter->second->finalizeOutput().ignore();
310 strm_iter->second->finalize().ignore();
311 m_streamerMap.erase( strm_iter );
312 m_outSeqSvc->publishRangeReport( rangeFN );
313}
#define ATH_MSG_INFO(x)
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated

◆ handle()

void AthenaOutputStream::handle ( const Incident & incident)
overridevirtual

Incident service handle listening for MetaDataStop.

Definition at line 223 of file AthenaOutputStream.cxx.

224{
225 ATH_MSG_DEBUG(std::format("handle() incident type: {}", inc.type()));
226 // mutex shared with write() which is called from writeMetaData
227 std::unique_lock<mutex_t> lock(m_mutex);
228
229 if( inc.type() == "MetaDataStop" ) {
230 if( m_outSeqSvc->inUse() ) {
231 if( m_outSeqSvc->inConcurrentEventsMode() ) {
232 // EventService MT - write metadata and close all remaining substreams
233 while( m_streamerMap.size() > 0 ) {
234 finalizeRange( inc.context(), m_streamerMap.begin()->first );
235 }
236 return;
237 }
238 if( m_outSeqSvc->lastIncident() == "EndEvent" ) {
239 // in r22 EndEvent comes before output writing
240 // - queue metadata writing and disconnect for after Event write
242 return;
243 }
244 }
245 // not in Event Service
246 writeMetaData(inc.context());
247 }
248 else if( m_outSeqSvc->inUse() ) {
249 // Handle Event Ranges for Event Service
250 EventContext::ContextID_t slot = inc.context().slot();
251 if( slot == EventContext::INVALID_CONTEXT_ID ) {
252 throw GaudiException("Received Incident with invalid slot in ES mode", name(), StatusCode::FAILURE);
253 }
254 auto count_events_in_range = [&](const std::string& range) {
255 return std::count_if(m_slotRangeMap.cbegin(), m_slotRangeMap.cend(),
256 [&](auto& el){return el.second == range;} );
257 };
258 if( inc.type() == IncidentType::BeginProcessing ) {
259 // get the current/old range filename for this slot
260 const std::string rangeFN = m_slotRangeMap[ slot ];
261 // build the new range filename for this slot
262 const std::string newRangeFN = m_outSeqSvc->buildSequenceFileName(inc.context(), m_outputName );
263 if( !rangeFN.empty() and rangeFN != newRangeFN ) {
264 ATH_MSG_INFO(std::format("Slot range change: '{}' -> '{}'", rangeFN, newRangeFN));
265 ATH_MSG_DEBUG(std::format("There are {} slots in use",m_slotRangeMap.size()));
266 for(const auto & range : m_slotRangeMap ) {
267 ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
268 }
269 if( count_events_in_range(rangeFN) == 1 ) {
270 finalizeRange( inc.context(), rangeFN );
271 }
272 }
273 ATH_MSG_INFO(std::format("slot {} processing event in range: {}", slot, newRangeFN));
274 m_slotRangeMap[ slot ] = newRangeFN;
275 // remember the RangeID for this slot so we can write metadata *after* a range change
276 m_rangeIDforRangeFN[ newRangeFN ] = m_outSeqSvc->currentRangeID(inc.context());
277 }
278 else if( inc.type() == IncidentType::EndProcessing ) {
279 ATH_MSG_DEBUG(std::format("There are {} slots in use", m_slotRangeMap.size()));
280 for( const auto& range : m_slotRangeMap ) {
281 ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
282 }
283 if( m_slotRangeMap.size() > 1 ) {
284 // if there are multiple slots, we can detect if the range ended with this event
285 // - except the last range, because there is no next range to clear the slot map
286 const std::string rangeFN = m_slotRangeMap[ slot ];
287 if( count_events_in_range(rangeFN) == 1 ) {
288 finalizeRange( inc.context(), rangeFN );
289 m_slotRangeMap[ slot ].clear();
290 }
291 }
292 }
293 }
294 ATH_MSG_DEBUG(std::format("Leaving incident handler for {}", inc.type()));
295}
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
void finalizeRange(const EventContext &ctx, const std::string &rangeFN)
mutex_t m_mutex
mutex for this Stream write() and handle() methods

◆ handleVariableSelection()

void AthenaOutputStream::handleVariableSelection ( const SG::IConstAuxStore & auxstore,
SG::DataProxy & itemProxy,
const std::string & aux_attr,
SG::SelectionVetoes & vetoes ) const
private

Here we build the vetoed AuxIDs.

Definition at line 819 of file AthenaOutputStream.cxx.

823{
824 // Collect dynamic Aux selection (parse the line, attributes separated by dot)
825 std::set<std::string> attributes;
826 if( aux_attr.size() ) {
827 std::stringstream ss(aux_attr);
828 std::string attr;
829 while( std::getline(ss, attr, '.') ) {
830 attributes.insert(attr);
831 }
832 }
833
834 // Return early if there's no selection.
835 if (attributes.empty()) {
836 return;
837 }
838
839 std::string key = itemProxy.name();
840 if (key.size() >= 4 && key.compare (key.size()-4, 4, "Aux.")==0)
841 {
842 key.erase (key.size()-4, 4);
843 }
844
845 // Find the entry for the selection.
846 SG::auxid_set_t& vset = vetoes[key];
847
848 // Form the veto mask for this object.
849 xAOD::AuxSelection sel;
850 sel.selectAux (attributes);
851
852 // Get all the AuxIDs that we know of and the selected ones
853 SG::auxid_set_t all = auxstore.getAuxIDs();
854 SG::auxid_set_t selected = sel.getSelectedAuxIDs( all );
855
856 // Loop over all and build a list of vetoed AuxIDs from non selected ones
857 for( const SG::auxid_t auxid : all ) {
858 if ( !selected.test( auxid ) ) {
859 vset.insert( auxid );
860 }
861 }
862}
virtual const name_type & name() const override final
Retrieve data object key == string.
size_t auxid_t
Identifier for a particular aux data item.
Definition AuxTypes.h:27

◆ initialize()

StatusCode AthenaOutputStream::initialize ( )
overridevirtual

Definition at line 73 of file AthenaOutputStream.cxx.

73 {
74 ATH_MSG_DEBUG("In initialize");
75
76 // Initialize the FilteredAlgorithm base
78
79 // Reset the number of events written
80 m_events = 0;
81
82 // Set up the SG services
83 ATH_CHECK( m_dataStore.retrieve() );
84 ATH_MSG_DEBUG(std::format("Found {} store.", m_dataStore.typeAndName()));
85 if (!m_metadataItemList.value().empty()) {
86 ATH_CHECK( m_metadataStore.retrieve() );
87 ATH_MSG_DEBUG(std::format("Found {} store.", m_metadataStore.typeAndName()));
88 }
89
90 // Set up various services
91 ATH_CHECK( m_pCLIDSvc.retrieve() );
92 ATH_CHECK( m_dictLoader.retrieve() );
93 ATH_CHECK( m_tpCnvSvc.retrieve() );
94 ATH_CHECK( m_outSeqSvc.retrieve() );
95
96 // Get Output Stream tool for writing
97 ATH_CHECK( m_streamer.retrieve() );
98 ATH_CHECK( m_streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord) );
99
100 ATH_CHECK( m_helperTools.retrieve() );
101 ATH_MSG_INFO("Found " << m_helperTools);
102 ATH_MSG_INFO(std::format("Data output: {}", m_outputName.toString()));
103
104 for (auto& tool : m_helperTools) {
105 ATH_CHECK( tool->postInitialize() );
106 }
107
108 // Register this algorithm for 'I/O' events
109 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
110 ATH_CHECK( iomgr.retrieve() );
111 ATH_CHECK( iomgr->io_register(this) );
112 ATH_CHECK( iomgr->io_register(this, IIoComponentMgr::IoMode::WRITE, m_outputName) );
113 ATH_CHECK( this->io_reinit() );
114
115 // Add an explicit input dependency for everything in our item list
116 // that we know from the configuration is in the transient store.
117 // We don't want to add everything on the list, because configurations
118 // often initialize this with a maximal static list of everything
119 // that could possibly be written.
120 {
121 ATH_CHECK( m_transient.retrieve() );
122 IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
123 if (!pAsIProp) {
124 ATH_MSG_FATAL ("Bad folder interface");
125 return StatusCode::FAILURE;
126 }
127 ATH_CHECK (pAsIProp->setProperty("ItemList", m_transientItems.toString()));
128
129 for (const SG::FolderItem& item : *m_p2BWritten) {
130 // Load ROOT dictionaries now.
131 loadDict (item.id());
132
133 const std::string& k = item.key();
134 if (k.find('*') != std::string::npos) continue;
135 if (k.find('.') != std::string::npos) continue;
136 for (const SG::FolderItem& titem : *m_transient) {
137 if (titem.id() == item.id() && titem.key() == k) {
138 DataObjID id (item.id(), std::format("{}+{}", m_dataStore.name(), k));
139 this->addDependency (id, Gaudi::DataHandle::Reader);
140 break;
141 }
142 }
143 }
144 m_transient->clear();
145 }
146
147 // Also load dictionaries for metadata classes.
148 if (!m_metadataItemList.value().empty()) {
149 IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
150 if (!pAsIProp) {
151 ATH_MSG_FATAL ("Bad folder interface");
152 return StatusCode::FAILURE;
153 }
154 ATH_CHECK (pAsIProp->setProperty("ItemList", m_metadataItemList.toString()));
155 for (const SG::FolderItem& item : *m_transient) {
156 loadDict (item.id());
157 }
158 m_transient->clear();
159 }
160
161 // Also make sure we have the dictionary for Token.
162 m_dictLoader->load_type ("Token");
163
164 // Listen to event range incidents if incident name is configured
165 ATH_CHECK( m_incidentSvc.retrieve() );
166 if( !m_outSeqSvc->incidentName().empty() ) {
167 // use priority 95 to make sure the Output Sequencer goes first (it has priority 100)
168 m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 95);
169 m_incidentSvc->addListener(this, IncidentType::EndProcessing, 95);
170 }
171
172 // Check compression settings and print some information about the configuration
173 // Both should be between [5, 23] and high compression should be < low compression
175 ATH_MSG_INFO(std::format("Float compression mantissa bits for high compression "
176 "({}) is outside the allowed range of [5, 23].",
177 m_compressionBitsHigh.toString()));
178 ATH_MSG_INFO("Setting it to the appropriate limit.");
180 }
182 ATH_MSG_INFO(std::format("Float compression mantissa bits for low compression "
183 "({}) is outside the allowed range of [5, 23].",
184 m_compressionBitsLow.toString()));
185 ATH_MSG_INFO("Setting it to the appropriate limit.");
187 }
189 ATH_MSG_ERROR(std::format("Float compression mantissa bits for low compression "
190 "({}) is lower than or equal to high compression "
191 "({})! Please check the configuration! ",
192 m_compressionBitsLow.toString(),
193 m_compressionBitsHigh.toString()));
194 return StatusCode::FAILURE;
195 }
196 if(m_compressionListHigh.value().empty() && m_compressionListLow.value().empty()) {
197 ATH_MSG_VERBOSE("Both high and low float compression lists are empty. Float compression will NOT be applied.");
198 } else {
199 ATH_MSG_INFO("Either high or low (or both) float compression lists are defined. Float compression will be applied.");
200 ATH_MSG_INFO(std::format("High compression will use {} mantissa bits, and "
201 "low compression will use {} mantissa bits.",
202 m_compressionBitsHigh.toString(),
203 m_compressionBitsLow.toString()));
204 }
205
206 // Setup stream name
207 if (m_streamName.empty()) {
208 m_streamName.setValue(this->name());
209 }
210
211 // Set SG key for selected variable information.
212 m_selVetoesKey = std::format("SelectionVetoes_{}", m_streamName.toString());
213 ATH_CHECK( m_selVetoesKey.initialize() );
214
215 m_compInfoKey = std::format("CompressionInfo_{}", m_streamName.toString());
216 ATH_CHECK( m_compInfoKey.initialize() );
217
218 ATH_MSG_DEBUG("End initialize");
219 return StatusCode::SUCCESS;
220}
#define ATH_MSG_FATAL(x)
#define ATH_MSG_VERBOSE(x)
StringProperty m_streamName
Stream name (defaults to algorithm name).
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
ServiceHandle< IClassIDSvc > m_pCLIDSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
ServiceHandle< IDictLoaderSvc > m_dictLoader
StringProperty m_persName
Name of the persistency service capable to write data from the store.
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
StringArrayProperty m_metadataItemList
Vector of item names.
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
BooleanProperty m_extendProvenanceRecord
Set to false to omit adding the current DataHeader into the DataHeader history This will cause the in...
std::atomic< int > m_events
Number of events written to this output stream.
virtual StatusCode initialize()

◆ io_finalize()

StatusCode AthenaOutputStream::io_finalize ( )
overridevirtual

Definition at line 909 of file AthenaOutputStream.cxx.

909 {
910 ATH_MSG_INFO("I/O finalization...");
911 for (auto& tool : m_helperTools) {
912 if (!tool->preFinalize().isSuccess()) {
913 ATH_MSG_ERROR("Cannot finalize helper tool");
914 }
915 }
916 const Incident metaDataStopIncident(name(), "MetaDataStop");
917 this->handle(metaDataStopIncident);
918 m_incidentSvc->removeListener(this, "MetaDataStop");
919 if (m_dataStore->clearStore().isFailure()) {
920 ATH_MSG_WARNING("Cannot clear the DataStore");
921 }
922 return StatusCode::SUCCESS;
923}

◆ io_reinit()

StatusCode AthenaOutputStream::io_reinit ( )
overridevirtual

Callback method to reinitialize the internal state of the component for I/O purposes (e.g. upon fork(2)).

Definition at line 895 of file AthenaOutputStream.cxx.

895 {
896 ATH_MSG_INFO("I/O reinitialization...");
897 m_incidentSvc->removeListener(this, "MetaDataStop"); // Remove any existing listener to avoid handling the incident multiple times
898 m_incidentSvc->addListener(this, "MetaDataStop", 50);
899 for (auto& tool : m_helperTools) {
900 if (!tool->postInitialize().isSuccess()) {
901 ATH_MSG_ERROR("Cannot initialize helper tool");
902 }
903 }
904 return StatusCode::SUCCESS;
905}

◆ itemListHandler()

void AthenaOutputStream::itemListHandler ( Gaudi::Details::PropertyBase & )
protected

Handler for ItemNames Property.

Definition at line 865 of file AthenaOutputStream.cxx.

865 {
866 // Assuming concrete SG::Folder also has an itemList property
867 IProperty *pAsIProp(nullptr);
868 if ((m_p2BWritten.retrieve()).isFailure() ||
869 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
870 (pAsIProp->setProperty(m_itemList)).isFailure()) {
871 throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
872 }
873}

◆ loadDict()

void AthenaOutputStream::loadDict ( CLID clid)
private

Helper function to load dictionaries (both transient and persistent) for a given type.

We want to to this explicitly during initialization to avoid sporadic failures seen loading dictionaries while multiple threads are running. See ATEAM-697 and ATEAM-749.

Definition at line 930 of file AthenaOutputStream.cxx.

931{
932 m_dictLoader->load_type (clid);
933
934 // Also load the persistent class dictionary, if applicable.
935 std::unique_ptr<ITPCnvBase> tpcnv = m_tpCnvSvc->t2p_cnv_unique (clid);
936 if (tpcnv) {
937 m_dictLoader->load_type (tpcnv->persistentTInfo());
938 }
939}

◆ selectedObjects()

IDataSelector * AthenaOutputStream::selectedObjects ( )
inline

Return the list of selected objects.

Definition at line 85 of file AthenaOutputStream.h.

85 {
86 return &m_objects;
87 }

◆ simpleMatch()

bool AthenaOutputStream::simpleMatch ( const std::string & pattern,
const std::string & text )
private

Glob-style matcher, where the only meta-character is '*'.

Definition at line 942 of file AthenaOutputStream.cxx.

943 {
944 size_t pi = 0, ti = 0, star = std::string::npos, match = 0;
945 while (ti < text.size()) {
946 if (pi < pattern.size() && (pattern[pi] == text[ti] || pattern[pi] == '*')) {
947 if (pattern[pi] == '*') { star = pi++; match = ti; }
948 else { ++pi; ++ti; }
949 } else if (star != std::string::npos) {
950 pi = star + 1;
951 ti = ++match;
952 } else return false;
953 }
954 while (pi < pattern.size() && pattern[pi] == '*') ++pi;
955 return pi == pattern.size();
956}
#define pi

◆ write()

StatusCode AthenaOutputStream::write ( const EventContext & ctx)
virtual

Stream the data.

Definition at line 443 of file AthenaOutputStream.cxx.

443 {
444 bool failed = false;
445 IAthenaOutputStreamTool* streamer = &*m_streamer;
446 std::string outputFN;
447
448 std::unique_lock<mutex_t> lock(m_mutex);
449 outputFN = m_outSeqSvc->buildSequenceFileName( ctx, m_outputName );
450
451 // Handle Event Ranges
452 if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() ) {
453 ATH_MSG_DEBUG(std::format("Writing event sequence to {}", outputFN));
454 streamer = m_streamerMap[ outputFN ].get();
455 if( !streamer ) {
456 // new range, needs a new streamer tool
457 IAlgTool* st = AlgTool::Factory::create( m_streamer->type(), m_streamer->type(), m_streamer->name(), this ).release();
458 st->addRef();
459 streamer = dynamic_cast<IAthenaOutputStreamTool*>( st );
460 IProperty *mstreamer_props = dynamic_cast<IProperty*> (&*m_streamer);
461 IProperty *streamer_props = dynamic_cast<IProperty*> (&*streamer);
462 if (!mstreamer_props || !streamer_props) {
463 ATH_MSG_FATAL("Cannot cast streamer to IProperty");
464 return StatusCode::FAILURE;
465 }
466 for ( const auto& prop : mstreamer_props->getProperties() ) {
467 ATH_CHECK( streamer_props->setProperty( *prop ) );
468 }
469 if( !streamer or streamer->initialize().isFailure()
470 or streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord).isFailure() ) {
471 ATH_MSG_FATAL(std::format("Unable to initialize OutputStreamTool for {}", outputFN));
472 return StatusCode::FAILURE;
473 }
474 m_streamerMap[ outputFN ].reset( streamer );
475 }
476 }
477
478 // Clear any previously existing item list
479 // and collect all objects that are asked to be written out
482
483 // keep a local copy of the object lists so they are not overwritten when we release the lock
484 IDataSelector objects = std::move( m_objects );
485 IDataSelector altObjects = std::move( m_altObjects );
486 std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move( m_ownedObjects );
487
488 // prepare before releasing lock because m_outputAttributes change in metadataStop
489 const std::string connectStr = outputFN + m_outputAttributes;
490
491 for (auto& tool : m_helperTools) {
492 ATH_CHECK( tool->preStream() );
493 }
494
495 // MN: would be nice to release the Stream lock here
496 // lock.unlock();
497
498 // Connect the output file to the service
499 if (!streamer->connectOutput(connectStr).isSuccess()) {
500 ATH_MSG_FATAL("Could not connectOutput");
501 return StatusCode::FAILURE;
502 }
503 ATH_MSG_DEBUG(std::format("connectOutput done for {}", outputFN));
504 StatusCode currentStatus = streamer->streamObjects(objects, connectStr);
505 // Do final check of streaming
506 if (!currentStatus.isSuccess()) {
507 if (!currentStatus.isRecoverable()) {
508 ATH_MSG_FATAL("streamObjects failed.");
509 failed = true;
510 } else {
511 ATH_MSG_DEBUG("streamObjects failed.");
512 }
513 }
514 bool doCommit = false;
515 if (!streamer->commitOutput(doCommit).isSuccess()) {
516 ATH_MSG_FATAL("commitOutput failed.");
517 failed = true;
518 }
519 if (failed) {
520 return(StatusCode::FAILURE);
521 }
522 m_events++;
523 return(StatusCode::SUCCESS);
524}
void clearSelection()
Clear list of selected objects.
StatusCode collectAllObjects(const EventContext &ctx)
Collect data objects for output streamer list.
std::string m_outputAttributes
Output attributes.
virtual StatusCode connectServices(const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord=false)=0
Specify which data store and conversion service to use and whether to extend provenence Only use if o...
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="")=0
virtual StatusCode connectOutput(const std::string &outputName="")=0
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
virtual StatusCode commitOutput(bool doCommit=false)=0
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ writeMetaData()

void AthenaOutputStream::writeMetaData ( const EventContext & ctx,
const std::string & outputFN = "" )
private

Write MetaData for this stream (by default) or for a substream outputFN (in ES mode).

Definition at line 318 of file AthenaOutputStream.cxx.

319{
320 // use main stream tool by default, or per outputFile in ES mode
321 IAthenaOutputStreamTool* streamer = outputFN.empty()? &*m_streamer : m_streamerMap[outputFN].get();
322
323 for (auto& tool : m_helperTools) {
324 if (!tool->preFinalize().isSuccess()) {
325 throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE);
326 }
327 }
328 if( m_metaDataSvc->prepareOutput(outputFN).isFailure() ) {
329 throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
330 }
331 // lock all metadata to prevent updates during writing
332 MetaDataSvc::ToolLockGuard tool_guard( *m_metaDataSvc );
333
334 // Prepare the WriteDataHeaderForms incident
335 std::string DHFWriteIncidentfileName = m_outSeqSvc->buildSequenceFileName(ctx, m_outputName);
336 // remove technology from the name
337 size_t pos = DHFWriteIncidentfileName.find(':');
338 if( pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(pos+1);
339 FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName);
340 m_incidentSvc->fireIncident(incident);
341
342 ATH_MSG_DEBUG("metadataItemList: " << m_metadataItemList.value() );
343 if (!m_metadataItemList.value().empty()) {
345 StatusCode status = streamer->connectServices(m_metadataStore.typeAndName(), m_persName, false);
346 if (status.isFailure()) {
347 throw GaudiException("Unable to connect metadata services", name(), StatusCode::FAILURE);
348 }
349 m_outputAttributes = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
350 m_p2BWritten->clear();
351 IProperty *pAsIProp(nullptr);
352 if ((m_p2BWritten.retrieve()).isFailure() ||
353 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
354 (pAsIProp->setProperty("ItemList", m_metadataItemList.toString())).isFailure()) {
355 throw GaudiException("Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
356 }
357 if (write(ctx).isFailure()) {
358 throw GaudiException("Cannot write metadata", name(), StatusCode::FAILURE);
359 }
360 FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName + m_outputAttributes);
361 m_incidentSvc->fireIncident(incident);
362
363 m_outputAttributes.clear();
366 if (status.isFailure()) {
367 throw GaudiException("Unable to re-connect services", name(), StatusCode::FAILURE);
368 }
369 m_p2BWritten->clear();
370 if ((pAsIProp->setProperty(m_itemList)).isFailure()) {
371 throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
372 }
373 ATH_MSG_DEBUG(std::format("Metadata items written: {}", m_metadataItemList.value().size()));
374 }
375}
status
Definition merge.py:16

Member Data Documentation

◆ m_altObjects

IDataSelector AthenaOutputStream::m_altObjects
protected

Objects overridden by `exact' handling.

Definition at line 175 of file AthenaOutputStream.h.

◆ m_CLIDKeyPairs

std::multimap<CLID,std::string> AthenaOutputStream::m_CLIDKeyPairs
protected

Map of (clid,key) pairs to be excluded (comes from m_excludeList).

Definition at line 169 of file AthenaOutputStream.h.

◆ m_compInfoKey

SG::WriteHandleKey<SG::CompressionInfo> AthenaOutputStream::m_compInfoKey { this, "CompInfoKey", "" }
private

Key used for recording lossy float compressed variable information to the event store.

Definition at line 227 of file AthenaOutputStream.h.

228{ this, "CompInfoKey", "" };

◆ m_compressionBitsHigh

UnsignedIntegerProperty AthenaOutputStream::m_compressionBitsHigh {this, "CompressionBitsHigh", 7, "Lossy float compression bits (high)"}
protected

Number of mantissa bits in the float compression.

Definition at line 131 of file AthenaOutputStream.h.

131{this, "CompressionBitsHigh", 7, "Lossy float compression bits (high)"};

◆ m_compressionBitsLow

UnsignedIntegerProperty AthenaOutputStream::m_compressionBitsLow {this, "CompressionBitsLow", 15, "Lossy float compression bits (low)"}
protected

Number of mantissa bits in the float compression.

Definition at line 134 of file AthenaOutputStream.h.

134{this, "CompressionBitsLow", 15, "Lossy float compression bits (low)"};

◆ m_compressionDecoderHigh

ToolHandle<SG::IFolder> AthenaOutputStream::m_compressionDecoderHigh
protected

The top-level folder with items to be compressed high.

Definition at line 160 of file AthenaOutputStream.h.

◆ m_compressionDecoderLow

ToolHandle<SG::IFolder> AthenaOutputStream::m_compressionDecoderLow
protected

The top-level folder with items to be compressed low.

Definition at line 163 of file AthenaOutputStream.h.

◆ m_compressionListHigh

StringArrayProperty AthenaOutputStream::m_compressionListHigh {this, "CompressionListHigh", {}, "Lossy float compression list (high)"}
protected

Vector of item names.

Definition at line 125 of file AthenaOutputStream.h.

125{this, "CompressionListHigh", {}, "Lossy float compression list (high)"};

◆ m_compressionListLow

StringArrayProperty AthenaOutputStream::m_compressionListLow {this, "CompressionListLow", {}, "Lossy float compression list (low)"}
protected

Vector of item names.

Definition at line 128 of file AthenaOutputStream.h.

128{this, "CompressionListLow", {}, "Lossy float compression list (low)"};

◆ m_currentStore

ServiceHandle<StoreGateSvc>* AthenaOutputStream::m_currentStore
protected

Definition at line 101 of file AthenaOutputStream.h.

◆ m_dataStore

ServiceHandle<StoreGateSvc> AthenaOutputStream::m_dataStore {this, "Store", "StoreGateSvc/StoreGateSvc", "Handle to event store"}
protected

Handle to the StoreGateSvc store where the data we want to write out resides.

Definition at line 99 of file AthenaOutputStream.h.

99{this, "Store", "StoreGateSvc/StoreGateSvc", "Handle to event store"};

◆ m_dictLoader

ServiceHandle<IDictLoaderSvc> AthenaOutputStream::m_dictLoader {this, "AthDictLoaderSvc", "AthDictLoaderSvc"}
protected

Definition at line 105 of file AthenaOutputStream.h.

105{this, "AthDictLoaderSvc", "AthDictLoaderSvc"};

◆ m_events

std::atomic<int> AthenaOutputStream::m_events {0}
protected

Number of events written to this output stream.

Definition at line 192 of file AthenaOutputStream.h.

192{0};

◆ m_extendProvenanceRecord

BooleanProperty AthenaOutputStream::m_extendProvenanceRecord {this, "ExtendProvenanceRecord", true, "Extend provenance record"}
protected

Set to false to omit adding the current DataHeader into the DataHeader history This will cause the input file to be neglected for back navigation (replace mode).

Definition at line 151 of file AthenaOutputStream.h.

151{this, "ExtendProvenanceRecord", true, "Extend provenance record"};

◆ m_forceRead

BooleanProperty AthenaOutputStream::m_forceRead {this, "ForceRead", true, "Force read data objects in ItemList"}
protected

set to true to force read of data objects in item list

Definition at line 147 of file AthenaOutputStream.h.

147{this, "ForceRead", true, "Force read data objects in ItemList"};

◆ m_helperTools

ToolHandleArray<IAthenaOutputTool> AthenaOutputStream::m_helperTools {this, "HelperTools", {}, "List of AlgTools used by this stream"}
protected

vector of AlgTools that that are executed by this stream

Definition at line 186 of file AthenaOutputStream.h.

186{this, "HelperTools", {}, "List of AlgTools used by this stream"};

◆ m_incidentSvc

ServiceHandle<IIncidentSvc> AthenaOutputStream::m_incidentSvc {this, "IncidentSvc", "IncidentSvc"}
protected

Definition at line 107 of file AthenaOutputStream.h.

107{this, "IncidentSvc", "IncidentSvc"};

◆ m_itemList

StringArrayProperty AthenaOutputStream::m_itemList {this, "ItemList", {}, "List of items to write", "OutputStreamItemList"}
protected

Vector of item names.

Definition at line 115 of file AthenaOutputStream.h.

115{this, "ItemList", {}, "List of items to write", "OutputStreamItemList"};

◆ m_itemListFromTool

BooleanProperty AthenaOutputStream::m_itemListFromTool {this, "TakeItemsFromInput", false, "Write everything in input DataHeader to output"}
protected

Set to write out everything in input DataHeader.

Definition at line 154 of file AthenaOutputStream.h.

154{this, "TakeItemsFromInput", false, "Write everything in input DataHeader to output"};

◆ m_keepProvenances

StringProperty AthenaOutputStream::m_keepProvenances
protected
Initial value:
{this, "KeepProvenanceTagsRegEx", {".*"},
"RegEx pattern to select processing tags for which DataHeader should retain provenances"}

Provenance record selection.

Definition at line 121 of file AthenaOutputStream.h.

121 {this, "KeepProvenanceTagsRegEx", {".*"},
122 "RegEx pattern to select processing tags for which DataHeader should retain provenances"};

◆ m_metadataItemList

StringArrayProperty AthenaOutputStream::m_metadataItemList {this, "MetadataItemList", {}, "List of metadata items to write","OutputStreamItemList"}
protected

Vector of item names.

Definition at line 118 of file AthenaOutputStream.h.

118{this, "MetadataItemList", {}, "List of metadata items to write","OutputStreamItemList"};

◆ m_metadataStore

ServiceHandle<StoreGateSvc> AthenaOutputStream::m_metadataStore {this, "MetadataStore", "StoreGateSvc/MetaDataStore", "Handle to metadata store"}
protected

Definition at line 100 of file AthenaOutputStream.h.

100{this, "MetadataStore", "StoreGateSvc/MetaDataStore", "Handle to metadata store"};

◆ m_metaDataSvc

ServiceHandle<MetaDataSvc> AthenaOutputStream::m_metaDataSvc {this, "MetaDataSvc", "MetaDataSvc"}
protected

Handles to all the necessary services.

Definition at line 104 of file AthenaOutputStream.h.

104{this, "MetaDataSvc", "MetaDataSvc"};

◆ m_mutex

mutex_t AthenaOutputStream::m_mutex
protected

mutex for this Stream write() and handle() methods

Definition at line 205 of file AthenaOutputStream.h.

◆ m_objects

IDataSelector AthenaOutputStream::m_objects
protected

Collection of objects being selected.

Definition at line 172 of file AthenaOutputStream.h.

◆ m_outputAttributes

std::string AthenaOutputStream::m_outputAttributes
private

Output attributes.

Definition at line 231 of file AthenaOutputStream.h.

◆ m_outputName

StringProperty AthenaOutputStream::m_outputName {this, "OutputFile", "DidNotNameOutput.root", "Name of the output file"}
protected

Name of the output file.

Definition at line 141 of file AthenaOutputStream.h.

141{this, "OutputFile", "DidNotNameOutput.root", "Name of the output file"};

◆ m_outSeqSvc

ServiceHandle<OutputStreamSequencerSvc> AthenaOutputStream::m_outSeqSvc {this, "OutputStreamSequencerSvc", "OutputStreamSequencerSvc"}
protected

Definition at line 109 of file AthenaOutputStream.h.

109{this, "OutputStreamSequencerSvc", "OutputStreamSequencerSvc"};

◆ m_ownedObjects

std::vector<std::unique_ptr<DataObject> > AthenaOutputStream::m_ownedObjects
protected

Collection of DataObject instances owned by this service.

FIXME: it would be simpler to just have m_objects be a vector of DataObjectSharedPtr<DataObject>, but that implies interface changes.

Definition at line 180 of file AthenaOutputStream.h.

◆ m_p2BWritten

ToolHandle<SG::IFolder> AthenaOutputStream::m_p2BWritten
protected

The top-level folder with items to be written.

Definition at line 157 of file AthenaOutputStream.h.

◆ m_pCLIDSvc

ServiceHandle<IClassIDSvc> AthenaOutputStream::m_pCLIDSvc {this, "ClassIDSvc", "ClassIDSvc"}
protected

Definition at line 108 of file AthenaOutputStream.h.

108{this, "ClassIDSvc", "ClassIDSvc"};

◆ m_persName

StringProperty AthenaOutputStream::m_persName {this, "EvtConversionSvc", "EventPersistencySvc", "Name of the persistency service writing data"}
protected

Name of the persistency service capable to write data from the store.

Definition at line 144 of file AthenaOutputStream.h.

144{this, "EvtConversionSvc", "EventPersistencySvc", "Name of the persistency service writing data"};

◆ m_rangeIDforRangeFN

std::map< std::string, std::string > AthenaOutputStream::m_rangeIDforRangeFN
protected

map of RangeIDs (as used by the Sequencer) for each Range filename generated

Definition at line 199 of file AthenaOutputStream.h.

◆ m_selVetoesKey

SG::WriteHandleKey<SG::SelectionVetoes> AthenaOutputStream::m_selVetoesKey { this, "SelVetoesKey", "" }
private

Key used for recording selected dynamic variable information to the event store.

Definition at line 222 of file AthenaOutputStream.h.

223{ this, "SelVetoesKey", "" };

◆ m_slotRangeMap

std::map< unsigned, std::string > AthenaOutputStream::m_slotRangeMap
protected

map of filenames assigned to active slots

Definition at line 196 of file AthenaOutputStream.h.

◆ m_streamer

ToolHandle<IAthenaOutputStreamTool> AthenaOutputStream::m_streamer
protected

pointer to AthenaOutputStreamTool

Definition at line 183 of file AthenaOutputStream.h.

◆ m_streamerMap

std::map< std::string, std::unique_ptr<IAthenaOutputStreamTool> > AthenaOutputStream::m_streamerMap
protected

map of streamerTools handling event ranges in MT

Definition at line 202 of file AthenaOutputStream.h.

◆ m_streamName

StringProperty AthenaOutputStream::m_streamName {this, "StreamName", "", "Name of the output stream"}
protected

Stream name (defaults to algorithm name).

Definition at line 112 of file AthenaOutputStream.h.

112{this, "StreamName", "", "Name of the output stream"};

◆ m_tpCnvSvc

ServiceHandle<ITPCnvSvc> AthenaOutputStream::m_tpCnvSvc {this, "AthTPCnvSvc", "AthTPCnvSvc"}
protected

Definition at line 106 of file AthenaOutputStream.h.

106{this, "AthTPCnvSvc", "AthTPCnvSvc"};

◆ m_transient

ToolHandle<SG::IFolder> AthenaOutputStream::m_transient
protected

Decoded list of transient ids.

Definition at line 166 of file AthenaOutputStream.h.

◆ m_transientItems

StringArrayProperty AthenaOutputStream::m_transientItems {this, "TransientItems", {}, "Transient item list"}
protected

List of items that are known to be present in the transient store (and hence we can make input dependencies on them).

Definition at line 138 of file AthenaOutputStream.h.

138{this, "TransientItems", {}, "Transient item list"};

◆ m_writeMetadataAndDisconnect

bool AthenaOutputStream::m_writeMetadataAndDisconnect {false}
protected

Definition at line 189 of file AthenaOutputStream.h.

189{false};

The documentation for this class was generated from the following files: