 |
ATLAS Offline Software
|
Go to the documentation of this file.
18 #include "GaudiKernel/AlgTool.h"
19 #include "GaudiKernel/ClassID.h"
20 #include "GaudiKernel/GaudiException.h"
21 #include "GaudiKernel/IAlgManager.h"
22 #include "GaudiKernel/IIoComponentMgr.h"
23 #include "GaudiKernel/IOpaqueAddress.h"
24 #include "GaudiKernel/IProperty.h"
25 #include "GaudiKernel/ISvcLocator.h"
26 #include "GaudiKernel/MsgStream.h"
44 : base_class(
name, pSvcLocator),
45 m_currentStore(&m_dataStore),
46 m_p2BWritten(std::
format(
"SG::Folder/{}_TopFolder",
name), this),
47 m_compressionDecoderHigh(std::
format(
"SG::Folder/{}_compressed_high",
name), this),
48 m_compressionDecoderLow(std::
format(
"SG::Folder/{}_compressed_low",
name), this),
49 m_transient(std::
format(
"SG::Folder/{}_transient",
name), this),
50 m_streamer(std::
format(
"AthenaOutputStreamTool/{}Tool",
name), this)
122 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
125 return StatusCode::FAILURE;
133 const std::string&
k =
item.key();
134 if (
k.find(
'*') != std::string::npos)
continue;
135 if (
k.find(
'.') != std::string::npos)
continue;
137 if (titem.id() ==
item.id() && titem.key() ==
k) {
149 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
152 return StatusCode::FAILURE;
168 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 95);
169 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 95);
174 if(m_compressionBitsHigh < 5u || m_compressionBitsHigh > 23
u) {
176 "({}) is outside the allowed range of [5, 23].",
181 if(m_compressionBitsLow < 5u || m_compressionBitsLow > 23
u) {
183 "({}) is outside the allowed range of [5, 23].",
190 "({}) is lower than or equal to high compression "
191 "({})! Please check the configuration! ",
194 return StatusCode::FAILURE;
197 ATH_MSG_VERBOSE(
"Both high and low float compression lists are empty. Float compression will NOT be applied.");
199 ATH_MSG_INFO(
"Either high or low (or both) float compression lists are defined. Float compression will be applied.");
201 "low compression will use {} mantissa bits.",
219 return StatusCode::SUCCESS;
229 if( inc.type() ==
"MetaDataStop" ) {
250 EventContext::ContextID_t slot = inc.context().slot();
251 if( slot == EventContext::INVALID_CONTEXT_ID ) {
252 throw GaudiException(
"Received Incident with invalid slot in ES mode",
name(), StatusCode::FAILURE);
254 auto count_events_in_range = [&](
const std::string&
range) {
256 [&](
auto&
el){return el.second == range;} );
258 if( inc.type() == IncidentType::BeginProcessing ) {
263 if( !rangeFN.empty() and rangeFN != newRangeFN ) {
269 if( count_events_in_range(rangeFN) == 1 ) {
278 else if( inc.type() == IncidentType::EndProcessing ) {
287 if( count_events_in_range(rangeFN) == 1 ) {
309 strm_iter->second->finalizeOutput().ignore();
310 strm_iter->second->finalize().ignore();
324 if (!
tool->preFinalize().isSuccess()) {
325 throw GaudiException(
"Cannot finalize helper tool",
name(), StatusCode::FAILURE);
329 throw GaudiException(
"Failed on MetaDataSvc prepareOutput",
name(), StatusCode::FAILURE);
337 size_t pos = DHFWriteIncidentfileName.find(
':');
338 if(
pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(
pos+1);
339 FileIncident incident(
name(),
"WriteDataHeaderForms", DHFWriteIncidentfileName);
347 throw GaudiException(
"Unable to connect metadata services",
name(), StatusCode::FAILURE);
349 m_outputAttributes =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
351 IProperty *pAsIProp(
nullptr);
353 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
355 throw GaudiException(
"Folder property [metadataItemList] not found",
name(), StatusCode::FAILURE);
357 if (
write().isFailure()) {
358 throw GaudiException(
"Cannot write metadata",
name(), StatusCode::FAILURE);
367 throw GaudiException(
"Unable to re-connect services",
name(), StatusCode::FAILURE);
370 if ((pAsIProp->setProperty(
m_itemList)).isFailure()) {
371 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
383 if (!
m_streamer->finalizeOutput().isSuccess()) {
395 return(StatusCode::FAILURE);
402 return(StatusCode::SUCCESS);
410 if (!
tool->preExecute().isSuccess()) {
415 if (isEventAccepted()) {
416 if (
write().isFailure()) {
422 if(!
tool->postExecute().isSuccess()) {
436 return(StatusCode::FAILURE);
438 return(StatusCode::SUCCESS);
459 IProperty *mstreamer_props =
dynamic_cast<IProperty*
> (&*
m_streamer);
460 IProperty *streamer_props =
dynamic_cast<IProperty*
> (&*streamer);
461 if (!mstreamer_props || !streamer_props) {
463 return StatusCode::FAILURE;
465 for (
const auto& prop : mstreamer_props->getProperties() ) {
466 ATH_CHECK( streamer_props->setProperty( *prop ) );
468 if( !streamer or streamer->initialize().isFailure()
471 return StatusCode::FAILURE;
485 std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move(
m_ownedObjects );
500 return StatusCode::FAILURE;
505 if (!currentStatus.isSuccess()) {
506 if (!currentStatus.isRecoverable()) {
513 bool doCommit =
false;
519 return(StatusCode::FAILURE);
522 return(StatusCode::SUCCESS);
536 ATH_MSG_WARNING(
"collectAllObjects() could not get ItemList from Tool.");
541 auto vetoes = std::make_unique<SG::SelectionVetoes>();
543 auto compInfo = std::make_unique<SG::CompressionInfo>();
552 if (!vetoes->empty()) {
557 if (!compInfo->empty()) {
561 return StatusCode::SUCCESS;
572 size_t dotpos =
item.key().find(
'.');
573 std::string item_key, aux_attr;
574 if( dotpos != std::string::npos ) {
575 item_key =
item.key().substr(0, dotpos+1);
576 aux_attr =
item.key().substr(dotpos+1);
578 item_key =
item.key();
583 if( aux_attr.size() ) {
590 std::map< unsigned int, std::set< std::string > > comp_attr_map;
595 for(
const auto&
it : comp_attr_map ) {
597 if (
it.second.size() > 0 ) {
598 for(
const auto& attr :
it.second ) {
608 bool gotProxies =
false;
611 if (
match !=
nullptr) {
612 map.insert({item_key,
match});
618 if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id,
iter,
end)).isSuccess()) {
622 bool added =
false, removed =
false;
626 std::string proxyName = itemProxy->
name();
633 bool keyMatch = ( item_key ==
"*" ||
634 item_key == proxyName ||
642 proxyName, item_key, keyMatch));
647 bool xkeyMatch =
false;
655 if (keyMatch && !xkeyMatch) {
661 if (
nullptr != itemProxy->object()) {
665 if( item_id != remapped_item_id ) {
672 auto altbucket = std::make_unique<AltDataBucket>(
680 return StatusCode::FAILURE;
682 }
else if (
item.exact()) {
686 if (!dbb) std::abort();
687 void*
ptr = dbb->
cast (item_id);
693 std::make_unique<AltDataBucket>
702 m_objects.push_back(itemProxy->object());
729 std::string
key = item_key;
730 key.erase (
key.size()-4, 4);
737 for(
const auto&
it :
compression.getCompressedAuxIDs( allVars ) ) {
738 if(
it.second.size() > 0 ) {
739 compInfo[
key ][
it.first ] =
it.second;
741 "lossy float compressed with {} mantissa bits",
742 key,
it.second.size(),
it.first));
751 }
else if (keyMatch && xkeyMatch) {
755 if (!added && !removed) {
757 }
else if (removed) {
765 return StatusCode::SUCCESS;
773 std::set<std::string>
776 const std::string& item_key)
const
779 std::set<std::string>
result;
782 if(item_key.find(
"Aux.") == std::string::npos) {
789 if (
iter.id() != item_id) {
793 size_t seppos =
iter.key().find(
'.');
794 std::string comp_item_key{
""}, comp_str{
""};
795 if(seppos != std::string::npos) {
796 comp_item_key =
iter.key().substr(0, seppos+1);
797 comp_str =
iter.key().substr(seppos+1);
799 comp_item_key =
iter.key();
803 if (!comp_str.empty() && comp_item_key == item_key) {
804 std::stringstream
ss(comp_str);
806 while( std::getline(
ss, attr,
'.') ) {
819 const std::string& aux_attr,
824 if( aux_attr.size() ) {
825 std::stringstream
ss(aux_attr);
827 while( std::getline(
ss, attr,
'.') ) {
837 std::string
key = itemProxy.
name();
838 if (
key.size() >= 4 &&
key.compare (
key.size()-4, 4,
"Aux.")==0)
840 key.erase (
key.size()-4, 4);
856 if ( !selected.
test( auxid ) ) {
865 IProperty *pAsIProp(
nullptr);
867 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
868 (pAsIProp->setProperty(
m_itemList)).isFailure()) {
869 throw GaudiException(
"Folder property [itemList] not found",
name(), StatusCode::FAILURE);
874 IProperty *pAsIProp(
nullptr);
878 throw GaudiException(
"Folder property [ItemList] not found",
name(), StatusCode::FAILURE);
883 IProperty *pAsIProp(
nullptr);
887 throw GaudiException(
"Folder property [ItemList] not found",
name(), StatusCode::FAILURE);
898 if (!
tool->postInitialize().isSuccess()) {
902 return StatusCode::SUCCESS;
910 if (!
tool->preFinalize().isSuccess()) {
914 const Incident metaDataStopIncident(
name(),
"MetaDataStop");
915 this->
handle(metaDataStopIncident);
920 return StatusCode::SUCCESS;
933 std::unique_ptr<ITPCnvBase> tpcnv =
m_tpCnvSvc->t2p_cnv_unique (clid);
941 const std::string&
text) {
942 size_t pi = 0, ti = 0, star = std::string::npos,
match = 0;
943 while (ti <
text.size()) {
947 }
else if (star != std::string::npos) {
virtual StatusCode finalize() override
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated
void writeMetaData(const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)
std::string m_outputAttributes
Output attributes.
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
std::string find(const std::string &s)
return a remapped string
StatusCode addItemObjects(const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
A non-templated base class for DataBucket, allows to access the transient object address as a void*.
bool isTransientKey(const std::string &key)
Test to see if a key is transoent.
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
StringArrayProperty m_itemList
Vector of item names.
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
ServiceHandle< StoreGateSvc > m_dataStore
Handle to the StoreGateSvc store where the data we want to write out resides.
StringArrayProperty m_compressionListLow
Vector of item names.
virtual void * object()=0
std::unordered_map< std::string, SG::auxid_set_t > SelectionVetoes
Map of vetoed variables.
DataObject * accessData()
Access DataObject on-demand using conversion service.
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
virtual StatusCode initialize() override
BooleanProperty m_forceRead
set to true to force read of data objects in item list
bool isValid() const
called by destructor
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
#define ATH_MSG_VERBOSE(x)
virtual StatusCode initialize()
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
IDataSelector m_objects
Collection of objects being selected.
StringArrayProperty m_metadataItemList
Vector of item names.
ServiceHandle< StoreGateSvc > * m_currentStore
@ u
Enums for curvilinear frames.
std::map< std::string, DataProxy * > ProxyMap
StatusCode collectAllObjects()
Collect data objects for output streamer list.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
StringArrayProperty m_compressionListHigh
Vector of item names.
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
Handle class for recording to StoreGate.
size_t auxid_t
Identifier for a particular aux data item.
AthenaOutputStream(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
UnsignedIntegerProperty m_compressionBitsHigh
Number of mantissa bits in the float compression.
ConcurrentBitset & insert(bit_t bit, bit_t new_nbits=0)
Set a bit to 1.
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
Interface providing I/O for a generic auxiliary store.
::StatusCode StatusCode
StatusCode definition for legacy code.
StringProperty m_outputName
Name of the output file.
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Default, invalid implementation of ClassID_traits.
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
StatusCode initialize(bool used=true)
If this object is used as a property, then this should be called during the initialize phase.
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
uint32_t CLID
The Class ID type.
virtual StatusCode io_finalize() override
StringProperty m_persName
Name of the persistency service capable to write data from the store.
CLID clID() const
Retrieve clid.
bool m_writeMetadataAndDisconnect
BooleanProperty m_extendProvenanceRecord
Set to false to omit adding the current DataHeader into the DataHeader history This will cause the in...
BooleanProperty m_itemListFromTool
Set to write out everything in input DataHeader.
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
void finalizeRange(const std::string &rangeFN)
ServiceHandle< IIncidentSvc > m_incidentSvc
std::unordered_map< std::string, SG::ThinningInfo::compression_map_t > CompressionInfo
Map of compressed variables and their compression levels.
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
Class helping in dealing with dynamic branch selection.
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Here we build the vetoed AuxIDs.
ServiceHandle< StoreGateSvc > m_metadataStore
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
virtual const name_type & name() const override final
Retrieve data object key == string.
StringProperty m_streamName
Stream name (defaults to algorithm name)
UnsignedIntegerProperty m_compressionBitsLow
Number of mantissa bits in the float compression.
IDataSelector m_altObjects
Objects overridden by ‘exact’ handling.
ServiceHandle< IDictLoaderSvc > m_dictLoader
ServiceHandle< MetaDataSvc > m_metaDataSvc
Handles to all the necessary services.
a static registry of CLID->typeName entries. NOT for general use. Use ClassIDSvc instead.
bool hasAlias(const std::string &key) const
Test to see if a given string is in the alias set.
std::atomic< int > m_events
Number of events written to this output stream.
bool simpleMatch(const std::string &pattern, const std::string &text)
Glob-style matcher, where the only meta-character is '*'.
ToolHandle< SG::IFolder > m_compressionDecoderHigh
The top-level folder with items to be compressed high.
mutex_t m_mutex
mutex for this Stream write() and handle() methods
#define ATH_MSG_WARNING(x)
virtual const std::type_info & persistentTInfo() const =0
return C++ type id of the persistent class this converter is for
virtual StatusCode execute() override
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT
Interface for non-const operations on an auxiliary store.
A set of aux data identifiers.
virtual StatusCode write()
Stream the data.
ServiceHandle< IClassIDSvc > m_pCLIDSvc
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
Interface for const operations on an auxiliary store.
Interface for const operations on an auxiliary store.
a Folder item (data object) is identified by the clid/key pair
virtual const SG::auxid_set_t & getAuxIDs() const =0
Return a set of identifiers for existing data items in this store.
ProxyMap::const_iterator ConstProxyIterator
bool match(std::string s1, std::string s2)
match the individual directories of two strings
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.
ToolHandle< SG::IFolder > m_p2BWritten
The top-level folder with items to be written.
void clearSelection()
Clear list of selected objects.
ToolHandle< SG::IFolder > m_compressionDecoderLow
The top-level folder with items to be compressed low.
virtual ~AthenaOutputStream()
Standard Destructor.
bool test(bit_t bit) const
Test to see if a bit is set.
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.