18#include "GaudiKernel/AlgTool.h"
19#include "GaudiKernel/ClassID.h"
20#include "GaudiKernel/GaudiException.h"
21#include "GaudiKernel/IAlgManager.h"
22#include "GaudiKernel/IIoComponentMgr.h"
23#include "GaudiKernel/IOpaqueAddress.h"
24#include "GaudiKernel/IProperty.h"
25#include "GaudiKernel/ISvcLocator.h"
26#include "GaudiKernel/MsgStream.h"
44 : base_class(name, pSvcLocator),
50 m_streamer(
std::format(
"AthenaOutputStreamTool/{}Tool", name), this)
122 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
125 return StatusCode::FAILURE;
133 const std::string& k = item.key();
134 if (k.find(
'*') != std::string::npos)
continue;
135 if (k.find(
'.') != std::string::npos)
continue;
137 if (titem.id() == item.id() && titem.key() == k) {
138 DataObjID id (item.id(), std::format(
"{}+{}",
m_dataStore.name(), k));
139 this->addDependency (
id, Gaudi::DataHandle::Reader);
149 IProperty *pAsIProp =
dynamic_cast<IProperty*
> (&*
m_transient);
152 return StatusCode::FAILURE;
168 m_incidentSvc->addListener(
this, IncidentType::BeginProcessing, 95);
169 m_incidentSvc->addListener(
this, IncidentType::EndProcessing, 95);
175 ATH_MSG_INFO(std::format(
"Float compression mantissa bits for high compression "
176 "({}) is outside the allowed range of [5, 23].",
182 ATH_MSG_INFO(std::format(
"Float compression mantissa bits for low compression "
183 "({}) is outside the allowed range of [5, 23].",
189 ATH_MSG_ERROR(std::format(
"Float compression mantissa bits for low compression "
190 "({}) is lower than or equal to high compression "
191 "({})! Please check the configuration! ",
194 return StatusCode::FAILURE;
197 ATH_MSG_VERBOSE(
"Both high and low float compression lists are empty. Float compression will NOT be applied.");
199 ATH_MSG_INFO(
"Either high or low (or both) float compression lists are defined. Float compression will be applied.");
200 ATH_MSG_INFO(std::format(
"High compression will use {} mantissa bits, and "
201 "low compression will use {} mantissa bits.",
219 return StatusCode::SUCCESS;
225 ATH_MSG_DEBUG(std::format(
"handle() incident type: {}", inc.type()));
227 std::unique_lock<mutex_t> lock(
m_mutex);
229 if( inc.type() ==
"MetaDataStop" ) {
250 EventContext::ContextID_t slot = inc.context().slot();
251 if( slot == EventContext::INVALID_CONTEXT_ID ) {
252 throw GaudiException(
"Received Incident with invalid slot in ES mode", name(), StatusCode::FAILURE);
254 auto count_events_in_range = [&](
const std::string& range) {
256 [&](
auto& el){return el.second == range;} );
258 if( inc.type() == IncidentType::BeginProcessing ) {
263 if( !rangeFN.empty() and rangeFN != newRangeFN ) {
264 ATH_MSG_INFO(std::format(
"Slot range change: '{}' -> '{}'", rangeFN, newRangeFN));
267 ATH_MSG_DEBUG(std::format(
"Slot: {} FN={}", range.first, range.second));
269 if( count_events_in_range(rangeFN) == 1 ) {
273 ATH_MSG_INFO(std::format(
"slot {} processing event in range: {}", slot, newRangeFN));
278 else if( inc.type() == IncidentType::EndProcessing ) {
281 ATH_MSG_DEBUG(std::format(
"Slot: {} FN={}", range.first, range.second));
287 if( count_events_in_range(rangeFN) == 1 ) {
294 ATH_MSG_DEBUG(std::format(
"Leaving incident handler for {}", inc.type()));
300 ATH_MSG_DEBUG(std::format(
"Writing MetaData to {}", rangeFN));
307 ATH_MSG_INFO(std::format(
"Finished writing Event Sequence to {}", rangeFN));
309 strm_iter->second->finalizeOutput().ignore();
310 strm_iter->second->finalize().ignore();
324 if (!tool->preFinalize().isSuccess()) {
325 throw GaudiException(
"Cannot finalize helper tool", name(), StatusCode::FAILURE);
329 throw GaudiException(
"Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
337 size_t pos = DHFWriteIncidentfileName.find(
':');
338 if( pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(pos+1);
339 FileIncident incident(name(),
"WriteDataHeaderForms", DHFWriteIncidentfileName);
346 if (status.isFailure()) {
347 throw GaudiException(
"Unable to connect metadata services", name(), StatusCode::FAILURE);
349 m_outputAttributes =
"[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
351 IProperty *pAsIProp(
nullptr);
353 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
355 throw GaudiException(
"Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
357 if (
write().isFailure()) {
358 throw GaudiException(
"Cannot write metadata", name(), StatusCode::FAILURE);
360 FileIncident incident(name(),
"WriteDataHeaderForms", DHFWriteIncidentfileName +
m_outputAttributes);
366 if (status.isFailure()) {
367 throw GaudiException(
"Unable to re-connect services", name(), StatusCode::FAILURE);
370 if ((pAsIProp->setProperty(
m_itemList)).isFailure()) {
371 throw GaudiException(
"Folder property [itemList] not found", name(), StatusCode::FAILURE);
383 if (!
m_streamer->finalizeOutput().isSuccess()) {
395 return(StatusCode::FAILURE);
402 return(StatusCode::SUCCESS);
410 if (!tool->preExecute().isSuccess()) {
415 if (isEventAccepted()) {
416 if (
write().isFailure()) {
422 if(!tool->postExecute().isSuccess()) {
436 return(StatusCode::FAILURE);
438 return(StatusCode::SUCCESS);
445 std::string outputFN;
447 std::unique_lock<mutex_t> lock(
m_mutex);
452 ATH_MSG_DEBUG(std::format(
"Writing event sequence to {}", outputFN));
459 IProperty *mstreamer_props =
dynamic_cast<IProperty*
> (&*
m_streamer);
460 IProperty *streamer_props =
dynamic_cast<IProperty*
> (&*streamer);
461 if (!mstreamer_props || !streamer_props) {
463 return StatusCode::FAILURE;
465 for (
const auto& prop : mstreamer_props->getProperties() ) {
466 ATH_CHECK( streamer_props->setProperty( *prop ) );
468 if( !streamer or streamer->initialize().isFailure()
470 ATH_MSG_FATAL(std::format(
"Unable to initialize OutputStreamTool for {}", outputFN));
471 return StatusCode::FAILURE;
483 IDataSelector objects = std::move(
m_objects );
485 std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move(
m_ownedObjects );
500 return StatusCode::FAILURE;
502 ATH_MSG_DEBUG(std::format(
"connectOutput done for {}", outputFN));
503 StatusCode currentStatus = streamer->
streamObjects(objects, connectStr);
505 if (!currentStatus.isSuccess()) {
506 if (!currentStatus.isRecoverable()) {
513 bool doCommit =
false;
519 return(StatusCode::FAILURE);
522 return(StatusCode::SUCCESS);
536 ATH_MSG_WARNING(
"collectAllObjects() could not get ItemList from Tool.");
541 auto vetoes = std::make_unique<SG::SelectionVetoes>();
543 auto compInfo = std::make_unique<SG::CompressionInfo>();
552 if (!vetoes->empty()) {
557 if (!compInfo->empty()) {
561 return StatusCode::SUCCESS;
572 size_t dotpos = item.key().find(
'.');
573 std::string item_key, aux_attr;
574 if( dotpos != std::string::npos ) {
575 item_key = item.key().substr(0, dotpos+1);
576 aux_attr = item.key().substr(dotpos+1);
578 item_key = item.key();
580 CLID item_id = item.id();
581 ATH_MSG_DEBUG(std::format(
"addItemObjects({},\"{}\") called", item_id, item_key));
583 if( aux_attr.size() ) {
590 std::map< unsigned int, std::set< std::string > > comp_attr_map;
595 for(
const auto& it : comp_attr_map ) {
596 ATH_MSG_DEBUG(std::format(
" Comp Attr {} with {} mantissa bits.", it.second.size(), it.first));
597 if ( it.second.size() > 0 ) {
598 for(
const auto& attr : it.second ) {
608 bool gotProxies =
false;
611 if (
match !=
nullptr) {
618 if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter, end)).isSuccess()) {
622 bool added =
false, removed =
false;
624 for (; iter != end; ++iter) {
626 std::string proxyName = itemProxy->
name();
633 bool keyMatch = ( item_key ==
"*" ||
634 item_key == proxyName ||
641 ATH_MSG_DEBUG(std::format(
"Result of checking {} against {} to see if it matches is {}",
642 proxyName, item_key, keyMatch));
647 bool xkeyMatch =
false;
655 if (keyMatch && !xkeyMatch) {
658 ATH_MSG_ERROR(std::format(
" Could not get data object for id {},\"{}\"", remapped_item_id, proxyName));
661 if (
nullptr != itemProxy->object()) {
665 if( item_id != remapped_item_id ) {
672 auto altbucket = std::make_unique<AltDataBucket>(
678 ATH_MSG_ERROR(std::format(
"Failed to retrieve object from MetaCont with key={}, for EventRangeID={}",
680 return StatusCode::FAILURE;
682 }
else if (item.exact()) {
686 if (!dbb) std::abort();
687 void* ptr = dbb->
cast (item_id);
693 std::make_unique<AltDataBucket>
702 m_objects.push_back(itemProxy->object());
703 ATH_MSG_DEBUG(std::format(
" Added object {},\"{}\"", item_id, proxyName));
715 }
catch(
const std::exception& ) {
716 ATH_MSG_DEBUG(std::format(
"Error in casting object with CLID {} to SG::IConstAuxStore*", itemProxy->
clID()));
729 std::string key = item_key;
730 key.erase (key.size()-4, 4);
738 if( it.second.size() > 0 ) {
739 compInfo[ key ][ it.first ] = it.second;
740 ATH_MSG_DEBUG(std::format(
"Container {} has {} variables that'll be "
741 "lossy float compressed with {} mantissa bits",
742 key, it.second.size(), it.first));
751 }
else if (keyMatch && xkeyMatch) {
755 if (!added && !removed) {
756 ATH_MSG_DEBUG(std::format(
" No object matching {},\"{}\" found", item_id, item_key));
757 }
else if (removed) {
758 ATH_MSG_DEBUG(std::format(
" Object being excluded based on property setting {},\"{}\". Skipping",
762 ATH_MSG_DEBUG(std::format(
" Failed to receive proxy iterators from StoreGate for {},\"{}\". Skipping",
765 return StatusCode::SUCCESS;
776 const std::string& item_key)
const
779 std::set<std::string>
result;
782 if(item_key.find(
"Aux.") == std::string::npos) {
787 for (
const auto& iter : *
handle) {
789 if (iter.id() != item_id) {
793 size_t seppos = iter.key().find(
'.');
794 std::string comp_item_key{
""}, comp_str{
""};
795 if(seppos != std::string::npos) {
796 comp_item_key = iter.key().substr(0, seppos+1);
797 comp_str = iter.key().substr(seppos+1);
799 comp_item_key = iter.key();
803 if (!comp_str.empty() && comp_item_key == item_key) {
804 std::stringstream
ss(comp_str);
806 while( std::getline(
ss, attr,
'.') ) {
819 const std::string& aux_attr,
823 std::set<std::string> attributes;
824 if( aux_attr.size() ) {
825 std::stringstream
ss(aux_attr);
827 while( std::getline(
ss, attr,
'.') ) {
828 attributes.insert(attr);
833 if (attributes.empty()) {
837 std::string key = itemProxy.
name();
838 if (key.size() >= 4 && key.compare (key.size()-4, 4,
"Aux.")==0)
840 key.erase (key.size()-4, 4);
848 sel.selectAux (attributes);
856 if ( !selected.
test( auxid ) ) {
865 IProperty *pAsIProp(
nullptr);
867 nullptr == (pAsIProp =
dynamic_cast<IProperty*
>(&*
m_p2BWritten)) ||
868 (pAsIProp->setProperty(
m_itemList)).isFailure()) {
869 throw GaudiException(
"Folder property [itemList] not found", name(), StatusCode::FAILURE);
874 IProperty *pAsIProp(
nullptr);
878 throw GaudiException(
"Folder property [ItemList] not found", name(), StatusCode::FAILURE);
883 IProperty *pAsIProp(
nullptr);
887 throw GaudiException(
"Folder property [ItemList] not found", name(), StatusCode::FAILURE);
898 if (!tool->postInitialize().isSuccess()) {
902 return StatusCode::SUCCESS;
910 if (!tool->preFinalize().isSuccess()) {
914 const Incident metaDataStopIncident(name(),
"MetaDataStop");
915 this->
handle(metaDataStopIncident);
920 return StatusCode::SUCCESS;
933 std::unique_ptr<ITPCnvBase> tpcnv =
m_tpCnvSvc->t2p_cnv_unique (clid);
941 const std::string& text) {
942 size_t pi = 0, ti = 0, star = std::string::npos,
match = 0;
943 while (ti < text.size()) {
944 if (
pi < pattern.size() && (pattern[
pi] == text[ti] || pattern[
pi] ==
'*')) {
945 if (pattern[
pi] ==
'*') { star =
pi++;
match = ti; }
947 }
else if (star != std::string::npos) {
952 while (
pi < pattern.size() && pattern[
pi] ==
'*') ++
pi;
953 return pi == pattern.size();
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
a static registry of CLID->typeName entries.
uint32_t CLID
The Class ID type.
Interface providing I/O for a generic auxiliary store.
Interface for non-const operations on an auxiliary store.
Interface for const operations on an auxiliary store.
Handle class for recording to StoreGate.
IDataSelector m_objects
Collection of objects being selected.
ServiceHandle< StoreGateSvc > m_metadataStore
virtual StatusCode finalize() override
StringProperty m_streamName
Stream name (defaults to algorithm name)
UnsignedIntegerProperty m_compressionBitsLow
Number of mantissa bits in the float compression.
UnsignedIntegerProperty m_compressionBitsHigh
Number of mantissa bits in the float compression.
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
ToolHandle< SG::IFolder > m_compressionDecoderLow
The top-level folder with items to be compressed low.
ToolHandle< SG::IFolder > m_p2BWritten
The top-level folder with items to be written.
StringArrayProperty m_compressionListLow
Vector of item names.
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
ServiceHandle< MetaDataSvc > m_metaDataSvc
Handles to all the necessary services.
BooleanProperty m_itemListFromTool
Set to write out everything in input DataHeader.
StringProperty m_outputName
Name of the output file.
virtual ~AthenaOutputStream()
Standard Destructor.
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
AthenaOutputStream(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
ToolHandle< SG::IFolder > m_compressionDecoderHigh
The top-level folder with items to be compressed high.
void clearSelection()
Clear list of selected objects.
StringArrayProperty m_itemList
Vector of item names.
ServiceHandle< IClassIDSvc > m_pCLIDSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Here we build the vetoed AuxIDs.
ServiceHandle< IDictLoaderSvc > m_dictLoader
IDataSelector m_altObjects
Objects overridden by ‘exact’ handling.
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
StatusCode addItemObjects(const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
StringProperty m_persName
Name of the persistency service capable to write data from the store.
BooleanProperty m_forceRead
set to true to force read of data objects in item list
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
bool m_writeMetadataAndDisconnect
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
StatusCode collectAllObjects()
Collect data objects for output streamer list.
std::string m_outputAttributes
Output attributes.
StringArrayProperty m_compressionListHigh
Vector of item names.
ServiceHandle< StoreGateSvc > * m_currentStore
StringArrayProperty m_metadataItemList
Vector of item names.
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
ServiceHandle< StoreGateSvc > m_dataStore
Handle to the StoreGateSvc store where the data we want to write out resides.
virtual StatusCode initialize() override
mutex_t m_mutex
mutex for this Stream write() and handle() methods
virtual StatusCode execute() override
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.
bool simpleMatch(const std::string &pattern, const std::string &text)
Glob-style matcher, where the only meta-character is '*'.
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
virtual StatusCode io_finalize() override
virtual StatusCode write()
Stream the data.
BooleanProperty m_extendProvenanceRecord
Set to false to omit adding the current DataHeader into the DataHeader history This will cause the in...
void finalizeRange(const std::string &rangeFN)
std::atomic< int > m_events
Number of events written to this output stream.
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
void writeMetaData(const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode)
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
bool test(bit_t bit) const
Test to see if a bit is set.
ConcurrentBitset & insert(bit_t bit, bit_t new_nbits=0)
Set a bit to 1.
A non-templated base class for DataBucket, allows to access the transient object address as a void*.
virtual void * object()=0
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
virtual StatusCode initialize()
CLID clID() const
Retrieve clid.
virtual const name_type & name() const override final
Retrieve data object key == string.
bool isValid() const
called by destructor
DataObject * accessData()
Access DataObject on-demand using conversion service.
bool hasAlias(const std::string &key) const
Test to see if a given string is in the alias set.
a Folder item (data object) is identified by the clid/key pair
Interface for const operations on an auxiliary store.
virtual const SG::auxid_set_t & getAuxIDs() const =0
Return a set of identifiers for existing data items in this store.
A set of aux data identifiers.
virtual std::map< unsigned int, SG::auxid_set_t > getCompressedAuxIDs(const SG::auxid_set_t &fullset) const
Return those variables that are selected to be compressed per compression setting.
virtual void setCompressedAuxIDs(const std::map< unsigned int, std::set< std::string > > &attributes)
Set which variables should be compressed per compression setting.
Class helping in dealing with dynamic branch selection.
bool match(std::string s1, std::string s2)
match the individual directories of two strings
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
std::unordered_map< std::string, SG::auxid_set_t > SelectionVetoes
Map of vetoed variables.
std::map< std::string, DataProxy * > ProxyMap
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
ProxyMap::const_iterator ConstProxyIterator
std::unordered_map< std::string, SG::ThinningInfo::compression_map_t > CompressionInfo
Map of compressed variables and their compression levels.
bool isTransientKey(const std::string &key)
Test to see if a key is transoent.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
size_t auxid_t
Identifier for a particular aux data item.