 |
ATLAS Offline Software
|
Go to the documentation of this file.
18 #include "GaudiKernel/AlgTool.h"
19 #include "GaudiKernel/ClassID.h"
20 #include "GaudiKernel/GaudiException.h"
21 #include "GaudiKernel/IAlgManager.h"
22 #include "GaudiKernel/IIoComponentMgr.h"
23 #include "GaudiKernel/IOpaqueAddress.h"
24 #include "GaudiKernel/IProperty.h"
25 #include "GaudiKernel/ISvcLocator.h"
26 #include "GaudiKernel/MsgStream.h"
44 : base_class(
name, pSvcLocator),
45 m_currentStore(&m_dataStore),
46 m_p2BWritten(std::
format(
"SG::Folder/{}_TopFolder",
name), this),
47 m_compressionDecoderHigh(std::
format(
"SG::Folder/{}_compressed_high",
name), this),
48 m_compressionDecoderLow(std::
format(
"SG::Folder/{}_compressed_low",
name), this),
49 m_transient(std::
format(
"SG::Folder/{}_transient",
name), this),
50 m_streamer(std::
format(
"AthenaOutputStreamTool/{}Tool",
name), this)
123 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
126 return StatusCode::FAILURE;
134 const std::string&
k =
item.key();
135 if (
k.find(
'*') != std::string::npos)
continue;
136 if (
k.find(
'.') != std::string::npos)
continue;
138 if (titem.id() ==
item.id() && titem.key() ==
k) {
150 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
153 return StatusCode::FAILURE;
169 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 95);
170 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 95);
175 if(m_compressionBitsHigh < 5 || m_compressionBitsHigh > 23) {
177 "({}) is outside the allowed range of [5, 23].",
182 if(m_compressionBitsLow < 5 || m_compressionBitsLow > 23) {
184 "({}) is outside the allowed range of [5, 23].",
191 "({}) is lower than or equal to high compression "
192 "({})! Please check the configuration! ",
195 return StatusCode::FAILURE;
198 ATH_MSG_VERBOSE(
"Both high and low float compression lists are empty. Float compression will NOT be applied.");
200 ATH_MSG_INFO(
"Either high or low (or both) float compression lists are defined. Float compression will be applied.");
202 "low compression will use {} mantissa bits.",
220 return StatusCode::SUCCESS;
230 if( inc.type() ==
"MetaDataStop" ) {
251 EventContext::ContextID_t slot = inc.context().slot();
252 if( slot == EventContext::INVALID_CONTEXT_ID ) {
253 throw GaudiException(
"Received Incident with invalid slot in ES mode",
name(), StatusCode::FAILURE);
255 auto count_events_in_range = [&](
const std::string&
range) {
257 [&](
auto&
el){return el.second == range;} );
259 if( inc.type() == IncidentType::BeginProcessing ) {
264 if( !rangeFN.empty() and rangeFN != newRangeFN ) {
270 if( count_events_in_range(rangeFN) == 1 ) {
279 else if( inc.type() == IncidentType::EndProcessing ) {
288 if( count_events_in_range(rangeFN) == 1 ) {
310 strm_iter->second->finalizeOutput().ignore();
311 strm_iter->second->finalize().ignore();
325 if (!
tool->preFinalize().isSuccess()) {
326 throw GaudiException(
"Cannot finalize helper tool",
name(), StatusCode::FAILURE);
330 throw GaudiException(
"Failed on MetaDataSvc prepareOutput",
name(), StatusCode::FAILURE);
338 size_t pos = DHFWriteIncidentfileName.find(
':');
339 if(
pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(
pos+1);
340 FileIncident incident(
name(),
"WriteDataHeaderForms", DHFWriteIncidentfileName);
348 throw GaudiException(
"Unable to connect metadata services",
name(), StatusCode::FAILURE);
350 m_outputAttributes =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
352 IProperty *pAsIProp(
nullptr);
354 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
356 throw GaudiException(
"Folder property [metadataItemList] not found",
name(), StatusCode::FAILURE);
358 if (
write().isFailure()) {
359 throw GaudiException(
"Cannot write metadata",
name(), StatusCode::FAILURE);
368 throw GaudiException(
"Unable to re-connect services",
name(), StatusCode::FAILURE);
371 if ((pAsIProp->setProperty(
m_itemList)).isFailure()) {
372 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
384 if (!
m_streamer->finalizeOutput().isSuccess()) {
396 return(StatusCode::FAILURE);
403 return(StatusCode::SUCCESS);
411 if (!
tool->preExecute().isSuccess()) {
416 if (isEventAccepted()) {
417 if (
write().isFailure()) {
423 if(!
tool->postExecute().isSuccess()) {
437 return(StatusCode::FAILURE);
439 return(StatusCode::SUCCESS);
460 IProperty *mstreamer_props =
dynamic_cast<IProperty*
> (&*
m_streamer);
461 IProperty *streamer_props =
dynamic_cast<IProperty*
> (&*streamer);
462 for (
const auto& prop : mstreamer_props->getProperties() ) {
463 ATH_CHECK( streamer_props->setProperty( *prop ) );
465 if( !streamer or streamer->initialize().isFailure()
468 return StatusCode::FAILURE;
482 std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move(
m_ownedObjects );
497 return StatusCode::FAILURE;
502 if (!currentStatus.isSuccess()) {
503 if (!currentStatus.isRecoverable()) {
510 bool doCommit =
false;
516 return(StatusCode::FAILURE);
519 return(StatusCode::SUCCESS);
533 ATH_MSG_WARNING(
"collectAllObjects() could not get ItemList from Tool.");
538 auto vetoes = std::make_unique<SG::SelectionVetoes>();
540 auto compInfo = std::make_unique<SG::CompressionInfo>();
549 if (!vetoes->empty()) {
554 if (!compInfo->empty()) {
558 return StatusCode::SUCCESS;
569 size_t dotpos =
item.key().find(
'.');
570 std::string item_key, aux_attr;
571 if( dotpos != std::string::npos ) {
572 item_key =
item.key().substr(0, dotpos+1);
573 aux_attr =
item.key().substr(dotpos+1);
575 item_key =
item.key();
580 if( aux_attr.size() ) {
587 std::map< unsigned int, std::set< std::string > > comp_attr_map;
592 for(
const auto&
it : comp_attr_map ) {
594 if (
it.second.size() > 0 ) {
595 for(
const auto& attr :
it.second ) {
605 bool gotProxies =
false;
608 if (
match !=
nullptr) {
609 map.insert({item_key,
match});
615 if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id,
iter,
end)).isSuccess()) {
619 bool added =
false, removed =
false;
623 std::string proxyName = itemProxy->
name();
630 bool keyMatch = ( item_key ==
"*" ||
631 item_key == proxyName ||
639 proxyName, item_key, keyMatch));
644 bool xkeyMatch =
false;
652 if (keyMatch && !xkeyMatch) {
658 if (
nullptr != itemProxy->object()) {
662 if( item_id != remapped_item_id ) {
669 auto altbucket = std::make_unique<AltDataBucket>(
677 return StatusCode::FAILURE;
679 }
else if (
item.exact()) {
683 if (!dbb) std::abort();
684 void*
ptr = dbb->
cast (item_id);
690 std::make_unique<AltDataBucket>
699 m_objects.push_back(itemProxy->object());
705 std::stringstream tns;
706 if (!
m_pCLIDSvc->getTypeNameOfID(item_id, tn).isSuccess()) {
708 tns << item_id <<
'_' << proxyName;
710 tn +=
'_' + proxyName;
738 std::string
key = item_key;
739 key.erase (
key.size()-4, 4);
746 for(
const auto&
it :
compression.getCompressedAuxIDs( allVars ) ) {
747 if(
it.second.size() > 0 ) {
748 compInfo[
key ][
it.first ] =
it.second;
750 "lossy float compressed with {} mantissa bits",
751 key,
it.second.size(),
it.first));
759 if (
m_itemSvc->addStreamItem(this->name(),tns.str()).isFailure()) {
763 }
else if (keyMatch && xkeyMatch) {
767 if (!added && !removed) {
769 }
else if (removed) {
777 return StatusCode::SUCCESS;
785 std::set<std::string>
788 const std::string& item_key)
const
791 std::set<std::string>
result;
794 if(item_key.find(
"Aux.") == std::string::npos) {
801 if (
iter.id() != item_id) {
805 size_t seppos =
iter.key().find(
'.');
806 std::string comp_item_key{
""}, comp_str{
""};
807 if(seppos != std::string::npos) {
808 comp_item_key =
iter.key().substr(0, seppos+1);
809 comp_str =
iter.key().substr(seppos+1);
811 comp_item_key =
iter.key();
815 if (!comp_str.empty() && comp_item_key == item_key) {
816 std::stringstream
ss(comp_str);
818 while( std::getline(
ss, attr,
'.') ) {
831 const std::string& tns,
832 const std::string& aux_attr,
837 if( aux_attr.size() ) {
838 std::stringstream
ss(aux_attr);
840 while( std::getline(
ss, attr,
'.') ) {
842 std::stringstream temp;
844 if (
m_itemSvc->addStreamItem(this->name(),temp.str()).isFailure()) {
855 std::string
key = itemProxy.
name();
856 if (
key.size() >= 4 &&
key.compare (
key.size()-4, 4,
"Aux.")==0)
858 key.erase (
key.size()-4, 4);
874 if ( !selected.
test( auxid ) ) {
883 IProperty *pAsIProp(
nullptr);
885 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
886 (pAsIProp->setProperty(
m_itemList)).isFailure()) {
887 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
892 IProperty *pAsIProp(
nullptr);
896 throw GaudiException(
"Folder property [ItemList] not found",
name(), StatusCode::FAILURE);
901 IProperty *pAsIProp(
nullptr);
905 throw GaudiException(
"Folder property [ItemList] not found",
name(), StatusCode::FAILURE);
916 if (!
tool->postInitialize().isSuccess()) {
920 return StatusCode::SUCCESS;
928 if (!
tool->preFinalize().isSuccess()) {
932 const Incident metaDataStopIncident(
name(),
"MetaDataStop");
933 this->
handle(metaDataStopIncident);
938 return StatusCode::SUCCESS;
951 std::unique_ptr<ITPCnvBase> tpcnv =
m_tpCnvSvc->t2p_cnv_unique (clid);
959 const std::string&
text) {
960 size_t pi = 0, ti = 0, star = std::string::npos,
match = 0;
961 while (ti <
text.size()) {
965 }
else if (star != std::string::npos) {
virtual StatusCode finalize() override
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated
void writeMetaData(const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)
std::string m_outputAttributes
Output attributes.
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
std::string find(const std::string &s)
return a remapped string
StatusCode addItemObjects(const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
A non-templated base class for DataBucket, allows to access the transient object address as a void*.
bool isTransientKey(const std::string &key)
Test to see if a key is transoent.
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
StringArrayProperty m_itemList
Vector of item names.
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
ServiceHandle< StoreGateSvc > m_dataStore
Handle to the StoreGateSvc store where the data we want to write out resides.
StringArrayProperty m_compressionListLow
Vector of item names.
virtual void * object()=0
std::unordered_map< std::string, SG::auxid_set_t > SelectionVetoes
Map of vetoed variables.
DataObject * accessData()
Access DataObject on-demand using conversion service.
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
virtual StatusCode initialize() override
BooleanProperty m_forceRead
set to true to force read of data objects in item list
bool isValid() const
called by destructor
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
#define ATH_MSG_VERBOSE(x)
virtual StatusCode initialize()
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
IDataSelector m_objects
Collection of objects being selected.
StringArrayProperty m_metadataItemList
Vector of item names.
ServiceHandle< StoreGateSvc > * m_currentStore
std::map< std::string, DataProxy * > ProxyMap
StatusCode collectAllObjects()
Collect data objects for output streamer list.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
StringArrayProperty m_compressionListHigh
Vector of item names.
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
Handle class for recording to StoreGate.
size_t auxid_t
Identifier for a particular aux data item.
AthenaOutputStream(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
UnsignedIntegerProperty m_compressionBitsHigh
Number of mantissa bits in the float compression.
ServiceHandle< IItemListSvc > m_itemSvc
Handles to all the necessary services.
ConcurrentBitset & insert(bit_t bit, bit_t new_nbits=0)
Set a bit to 1.
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
Interface providing I/O for a generic auxiliary store.
::StatusCode StatusCode
StatusCode definition for legacy code.
StringProperty m_outputName
Name of the output file.
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Default, invalid implementation of ClassID_traits.
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &tns, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Here we build the vetoed AuxIDs.
uint32_t CLID
The Class ID type.
virtual StatusCode io_finalize() override
StringProperty m_persName
Name of the persistency service capable to write data from the store.
CLID clID() const
Retrieve clid.
bool m_writeMetadataAndDisconnect
BooleanProperty m_extendProvenanceRecord
Set to false to omit adding the current DataHeader into the DataHeader history This will cause the in...
BooleanProperty m_itemListFromTool
Set to write out everything in input DataHeader.
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
void finalizeRange(const std::string &rangeFN)
ServiceHandle< IIncidentSvc > m_incidentSvc
std::unordered_map< std::string, SG::ThinningInfo::compression_map_t > CompressionInfo
Map of compressed variables and their compression levels.
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Class helping in dealing with dynamic branch selection.
ServiceHandle< StoreGateSvc > m_metadataStore
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
virtual const name_type & name() const override final
Retrieve data object key == string.
StringProperty m_streamName
Stream name (defaults to algorithm name)
UnsignedIntegerProperty m_compressionBitsLow
Number of mantissa bits in the float compression.
IDataSelector m_altObjects
Objects overridden by ‘exact’ handling.
ServiceHandle< IDictLoaderSvc > m_dictLoader
ServiceHandle< MetaDataSvc > m_metaDataSvc
a static registry of CLID->typeName entries. NOT for general use. Use ClassIDSvc instead.
bool hasAlias(const std::string &key) const
Test to see if a given string is in the alias set.
std::atomic< int > m_events
Number of events written to this output stream.
bool simpleMatch(const std::string &pattern, const std::string &text)
Glob-style matcher, where the only meta-character is '*'.
ToolHandle< SG::IFolder > m_compressionDecoderHigh
The top-level folder with items to be compressed high.
mutex_t m_mutex
mutex for this Stream write() and handle() methods
#define ATH_MSG_WARNING(x)
virtual const std::type_info & persistentTInfo() const =0
return C++ type id of the persistent class this converter is for
virtual StatusCode execute() override
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT
Interface for non-const operations on an auxiliary store.
A set of aux data identifiers.
virtual StatusCode write()
Stream the data.
ServiceHandle< IClassIDSvc > m_pCLIDSvc
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
Interface for const operations on an auxiliary store.
Interface for const operations on an auxiliary store.
a Folder item (data object) is identified by the clid/key pair
virtual const SG::auxid_set_t & getAuxIDs() const =0
Return a set of identifiers for existing data items in this store.
ProxyMap::const_iterator ConstProxyIterator
bool match(std::string s1, std::string s2)
match the individual directories of two strings
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.
ToolHandle< SG::IFolder > m_p2BWritten
The top-level folder with items to be written.
void clearSelection()
Clear list of selected objects.
ToolHandle< SG::IFolder > m_compressionDecoderLow
The top-level folder with items to be compressed low.
virtual ~AthenaOutputStream()
Standard Destructor.
bool test(bit_t bit) const
Test to see if a bit is set.
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.