ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaOutputStream.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
6
7// STL include files
8#include <cassert>
9#include <format>
10#include <sstream>
11
12// Framework include files
18#include "GaudiKernel/AlgTool.h"
19#include "GaudiKernel/ClassID.h"
20#include "GaudiKernel/GaudiException.h"
21#include "GaudiKernel/IAlgManager.h"
22#include "GaudiKernel/IIoComponentMgr.h"
23#include "GaudiKernel/IOpaqueAddress.h"
24#include "GaudiKernel/IProperty.h"
25#include "GaudiKernel/ISvcLocator.h"
26#include "GaudiKernel/MsgStream.h"
29#include "SGTools/DataProxy.h"
30#include "SGTools/ProxyMap.h"
31#include "SGTools/SGIFolder.h"
38
39// Local include files
40#include "AltDataBucket.h"
41
42// Standard Constructor
43AthenaOutputStream::AthenaOutputStream(const std::string& name, ISvcLocator* pSvcLocator)
44 : base_class(name, pSvcLocator),
46 m_p2BWritten(std::format("SG::Folder/{}_TopFolder", name), this),
47 m_compressionDecoderHigh(std::format("SG::Folder/{}_compressed_high", name), this),
48 m_compressionDecoderLow(std::format("SG::Folder/{}_compressed_low", name), this),
49 m_transient(std::format("SG::Folder/{}_transient", name), this),
50 m_streamer(std::format("AthenaOutputStreamTool/{}Tool", name), this)
51{
52 // Ensure the service locater is good
53 assert(pSvcLocator);
54
55 // This property depends on the name that's known at construction time
56 // Therefore, do it the old fashioned way
57 declareProperty("WritingTool", m_streamer);
58
59 // Associate action handlers with the AcceptAlgs,
60 // RequireAlgs & VetoAlgs properties
61 m_itemList.declareUpdateHandler(&AthenaOutputStream::itemListHandler, this);
64}
65
66// Standard Destructor
68 // Clear the internal caches
69 m_streamerMap.clear();
70}
71
72// Initialize data writer
74 ATH_MSG_DEBUG("In initialize");
75
76 // Initialize the FilteredAlgorithm base
78
79 // Reset the number of events written
80 m_events = 0;
81
82 // Set up the SG services
83 ATH_CHECK( m_dataStore.retrieve() );
84 ATH_MSG_DEBUG(std::format("Found {} store.", m_dataStore.typeAndName()));
85 if (!m_metadataItemList.value().empty()) {
86 ATH_CHECK( m_metadataStore.retrieve() );
87 ATH_MSG_DEBUG(std::format("Found {} store.", m_metadataStore.typeAndName()));
88 }
89
90 // Set up various services
91 ATH_CHECK( m_pCLIDSvc.retrieve() );
92 ATH_CHECK( m_dictLoader.retrieve() );
93 ATH_CHECK( m_tpCnvSvc.retrieve() );
94 ATH_CHECK( m_outSeqSvc.retrieve() );
95
96 // Get Output Stream tool for writing
97 ATH_CHECK( m_streamer.retrieve() );
98 ATH_CHECK( m_streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord) );
99
100 ATH_CHECK( m_helperTools.retrieve() );
101 ATH_MSG_INFO("Found " << m_helperTools);
102 ATH_MSG_INFO(std::format("Data output: {}", m_outputName.toString()));
103
104 for (auto& tool : m_helperTools) {
105 ATH_CHECK( tool->postInitialize() );
106 }
107
108 // Register this algorithm for 'I/O' events
109 ServiceHandle<IIoComponentMgr> iomgr("IoComponentMgr", name());
110 ATH_CHECK( iomgr.retrieve() );
111 ATH_CHECK( iomgr->io_register(this) );
112 ATH_CHECK( iomgr->io_register(this, IIoComponentMgr::IoMode::WRITE, m_outputName) );
113 ATH_CHECK( this->io_reinit() );
114
115 // Add an explicit input dependency for everything in our item list
116 // that we know from the configuration is in the transient store.
117 // We don't want to add everything on the list, because configurations
118 // often initialize this with a maximal static list of everything
119 // that could possibly be written.
120 {
121 ATH_CHECK( m_transient.retrieve() );
122 IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
123 if (!pAsIProp) {
124 ATH_MSG_FATAL ("Bad folder interface");
125 return StatusCode::FAILURE;
126 }
127 ATH_CHECK (pAsIProp->setProperty("ItemList", m_transientItems.toString()));
128
129 for (const SG::FolderItem& item : *m_p2BWritten) {
130 // Load ROOT dictionaries now.
131 loadDict (item.id());
132
133 const std::string& k = item.key();
134 if (k.find('*') != std::string::npos) continue;
135 if (k.find('.') != std::string::npos) continue;
136 for (const SG::FolderItem& titem : *m_transient) {
137 if (titem.id() == item.id() && titem.key() == k) {
138 DataObjID id (item.id(), std::format("{}+{}", m_dataStore.name(), k));
139 this->addDependency (id, Gaudi::DataHandle::Reader);
140 break;
141 }
142 }
143 }
144 m_transient->clear();
145 }
146
147 // Also load dictionaries for metadata classes.
148 if (!m_metadataItemList.value().empty()) {
149 IProperty *pAsIProp = dynamic_cast<IProperty*> (&*m_transient);
150 if (!pAsIProp) {
151 ATH_MSG_FATAL ("Bad folder interface");
152 return StatusCode::FAILURE;
153 }
154 ATH_CHECK (pAsIProp->setProperty("ItemList", m_metadataItemList.toString()));
155 for (const SG::FolderItem& item : *m_transient) {
156 loadDict (item.id());
157 }
158 m_transient->clear();
159 }
160
161 // Also make sure we have the dictionary for Token.
162 m_dictLoader->load_type ("Token");
163
164 // Listen to event range incidents if incident name is configured
165 ATH_CHECK( m_incidentSvc.retrieve() );
166 if( !m_outSeqSvc->incidentName().empty() ) {
167 // use priority 95 to make sure the Output Sequencer goes first (it has priority 100)
168 m_incidentSvc->addListener(this, IncidentType::BeginProcessing, 95);
169 m_incidentSvc->addListener(this, IncidentType::EndProcessing, 95);
170 }
171
172 // Check compression settings and print some information about the configuration
173 // Both should be between [5, 23] and high compression should be < low compression
175 ATH_MSG_INFO(std::format("Float compression mantissa bits for high compression "
176 "({}) is outside the allowed range of [5, 23].",
177 m_compressionBitsHigh.toString()));
178 ATH_MSG_INFO("Setting it to the appropriate limit.");
180 }
182 ATH_MSG_INFO(std::format("Float compression mantissa bits for low compression "
183 "({}) is outside the allowed range of [5, 23].",
184 m_compressionBitsLow.toString()));
185 ATH_MSG_INFO("Setting it to the appropriate limit.");
187 }
189 ATH_MSG_ERROR(std::format("Float compression mantissa bits for low compression "
190 "({}) is lower than or equal to high compression "
191 "({})! Please check the configuration! ",
192 m_compressionBitsLow.toString(),
193 m_compressionBitsHigh.toString()));
194 return StatusCode::FAILURE;
195 }
196 if(m_compressionListHigh.value().empty() && m_compressionListLow.value().empty()) {
197 ATH_MSG_VERBOSE("Both high and low float compression lists are empty. Float compression will NOT be applied.");
198 } else {
199 ATH_MSG_INFO("Either high or low (or both) float compression lists are defined. Float compression will be applied.");
200 ATH_MSG_INFO(std::format("High compression will use {} mantissa bits, and "
201 "low compression will use {} mantissa bits.",
202 m_compressionBitsHigh.toString(),
203 m_compressionBitsLow.toString()));
204 }
205
206 // Setup stream name
207 if (m_streamName.empty()) {
208 m_streamName.setValue(this->name());
209 }
210
211 // Set SG key for selected variable information.
212 m_selVetoesKey = std::format("SelectionVetoes_{}", m_streamName.toString());
213 ATH_CHECK( m_selVetoesKey.initialize() );
214
215 m_compInfoKey = std::format("CompressionInfo_{}", m_streamName.toString());
216 ATH_CHECK( m_compInfoKey.initialize() );
217
218 ATH_MSG_DEBUG("End initialize");
219 return StatusCode::SUCCESS;
220}
221
222// Handle incidents
223void AthenaOutputStream::handle(const Incident& inc)
224{
225 ATH_MSG_DEBUG(std::format("handle() incident type: {}", inc.type()));
226 // mutex shared with write() which is called from writeMetaData
227 std::unique_lock<mutex_t> lock(m_mutex);
228
229 if( inc.type() == "MetaDataStop" ) {
230 if( m_outSeqSvc->inUse() ) {
231 if( m_outSeqSvc->inConcurrentEventsMode() ) {
232 // EventService MT - write metadata and close all remaining substreams
233 while( m_streamerMap.size() > 0 ) {
234 finalizeRange( inc.context(), m_streamerMap.begin()->first );
235 }
236 return;
237 }
238 if( m_outSeqSvc->lastIncident() == "EndEvent" ) {
239 // in r22 EndEvent comes before output writing
240 // - queue metadata writing and disconnect for after Event write
242 return;
243 }
244 }
245 // not in Event Service
246 writeMetaData(inc.context());
247 }
248 else if( m_outSeqSvc->inUse() ) {
249 // Handle Event Ranges for Event Service
250 EventContext::ContextID_t slot = inc.context().slot();
251 if( slot == EventContext::INVALID_CONTEXT_ID ) {
252 throw GaudiException("Received Incident with invalid slot in ES mode", name(), StatusCode::FAILURE);
253 }
254 auto count_events_in_range = [&](const std::string& range) {
255 return std::count_if(m_slotRangeMap.cbegin(), m_slotRangeMap.cend(),
256 [&](auto& el){return el.second == range;} );
257 };
258 if( inc.type() == IncidentType::BeginProcessing ) {
259 // get the current/old range filename for this slot
260 const std::string rangeFN = m_slotRangeMap[ slot ];
261 // build the new range filename for this slot
262 const std::string newRangeFN = m_outSeqSvc->buildSequenceFileName(inc.context(), m_outputName );
263 if( !rangeFN.empty() and rangeFN != newRangeFN ) {
264 ATH_MSG_INFO(std::format("Slot range change: '{}' -> '{}'", rangeFN, newRangeFN));
265 ATH_MSG_DEBUG(std::format("There are {} slots in use",m_slotRangeMap.size()));
266 for(const auto & range : m_slotRangeMap ) {
267 ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
268 }
269 if( count_events_in_range(rangeFN) == 1 ) {
270 finalizeRange( inc.context(), rangeFN );
271 }
272 }
273 ATH_MSG_INFO(std::format("slot {} processing event in range: {}", slot, newRangeFN));
274 m_slotRangeMap[ slot ] = newRangeFN;
275 // remember the RangeID for this slot so we can write metadata *after* a range change
276 m_rangeIDforRangeFN[ newRangeFN ] = m_outSeqSvc->currentRangeID(inc.context());
277 }
278 else if( inc.type() == IncidentType::EndProcessing ) {
279 ATH_MSG_DEBUG(std::format("There are {} slots in use", m_slotRangeMap.size()));
280 for( const auto& range : m_slotRangeMap ) {
281 ATH_MSG_DEBUG(std::format("Slot: {} FN={}", range.first, range.second));
282 }
283 if( m_slotRangeMap.size() > 1 ) {
284 // if there are multiple slots, we can detect if the range ended with this event
285 // - except the last range, because there is no next range to clear the slot map
286 const std::string rangeFN = m_slotRangeMap[ slot ];
287 if( count_events_in_range(rangeFN) == 1 ) {
288 finalizeRange( inc.context(), rangeFN );
289 m_slotRangeMap[ slot ].clear();
290 }
291 }
292 }
293 }
294 ATH_MSG_DEBUG(std::format("Leaving incident handler for {}", inc.type()));
295}
296
297// Note - this method works in any slot - MetaCont uses the filenames to find objects
298void AthenaOutputStream::finalizeRange( const EventContext& ctx, const std::string & rangeFN )
299{
300 ATH_MSG_DEBUG(std::format("Writing MetaData to {}", rangeFN));
301 // MN: not calling StopMetaData Incident here but directly writeMetaData() - OK for Sim, check others
302 // metadata tools like CutFlowSvc are not able to handle this yet
303 const std::string rememberID = m_outSeqSvc->setRangeID( ctx, m_rangeIDforRangeFN[ rangeFN ] );
304 writeMetaData( ctx, rangeFN );
305 m_outSeqSvc->setRangeID( ctx, rememberID );
306
307 ATH_MSG_INFO(std::format("Finished writing Event Sequence to {}", rangeFN));
308 auto strm_iter = m_streamerMap.find( rangeFN );
309 strm_iter->second->finalizeOutput().ignore();
310 strm_iter->second->finalize().ignore();
311 m_streamerMap.erase( strm_iter );
312 m_outSeqSvc->publishRangeReport( rangeFN );
313}
314
315// Method to write MetaData for this stream
316// in ES mode the range substream is determined by the current Event slot
317// called from the incident handler - returns void and throws GaudiExceptions on errors
318void AthenaOutputStream::writeMetaData(const EventContext& ctx, const std::string& outputFN)
319{
320 // use main stream tool by default, or per outputFile in ES mode
321 IAthenaOutputStreamTool* streamer = outputFN.empty()? &*m_streamer : m_streamerMap[outputFN].get();
322
323 for (auto& tool : m_helperTools) {
324 if (!tool->preFinalize().isSuccess()) {
325 throw GaudiException("Cannot finalize helper tool", name(), StatusCode::FAILURE);
326 }
327 }
328 if( m_metaDataSvc->prepareOutput(outputFN).isFailure() ) {
329 throw GaudiException("Failed on MetaDataSvc prepareOutput", name(), StatusCode::FAILURE);
330 }
331 // lock all metadata to prevent updates during writing
333
334 // Prepare the WriteDataHeaderForms incident
335 std::string DHFWriteIncidentfileName = m_outSeqSvc->buildSequenceFileName(ctx, m_outputName);
336 // remove technology from the name
337 size_t pos = DHFWriteIncidentfileName.find(':');
338 if( pos != std::string::npos ) DHFWriteIncidentfileName = DHFWriteIncidentfileName.substr(pos+1);
339 FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName);
340 m_incidentSvc->fireIncident(incident);
341
342 ATH_MSG_DEBUG("metadataItemList: " << m_metadataItemList.value() );
343 if (!m_metadataItemList.value().empty()) {
345 StatusCode status = streamer->connectServices(m_metadataStore.typeAndName(), m_persName, false);
346 if (status.isFailure()) {
347 throw GaudiException("Unable to connect metadata services", name(), StatusCode::FAILURE);
348 }
349 m_outputAttributes = "[OutputCollection=MetaDataHdr][PoolContainerPrefix=MetaData][AttributeListKey=]";
350 m_p2BWritten->clear();
351 IProperty *pAsIProp(nullptr);
352 if ((m_p2BWritten.retrieve()).isFailure() ||
353 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
354 (pAsIProp->setProperty("ItemList", m_metadataItemList.toString())).isFailure()) {
355 throw GaudiException("Folder property [metadataItemList] not found", name(), StatusCode::FAILURE);
356 }
357 if (write(ctx).isFailure()) {
358 throw GaudiException("Cannot write metadata", name(), StatusCode::FAILURE);
359 }
360 FileIncident incident(name(), "WriteDataHeaderForms", DHFWriteIncidentfileName + m_outputAttributes);
361 m_incidentSvc->fireIncident(incident);
362
363 m_outputAttributes.clear();
365 status = streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord);
366 if (status.isFailure()) {
367 throw GaudiException("Unable to re-connect services", name(), StatusCode::FAILURE);
368 }
369 m_p2BWritten->clear();
370 if ((pAsIProp->setProperty(m_itemList)).isFailure()) {
371 throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
372 }
373 ATH_MSG_DEBUG(std::format("Metadata items written: {}", m_metadataItemList.value().size()));
374 }
375}
376
377// Terminate data writer
379{
380 bool failed = false;
381 ATH_MSG_DEBUG("finalize: Optimize output");
382 // Connect the output file to the service
383 if (!m_streamer->finalizeOutput().isSuccess()) {
384 failed = true;
385 }
386 ATH_MSG_DEBUG("finalize: end optimize output");
387 // Release the tools
388 if (!m_helperTools.release().isSuccess()) {
389 failed = true;
390 }
391 if (!m_streamer.release().isSuccess()) {
392 failed = true;
393 }
394 if (failed) {
395 return(StatusCode::FAILURE);
396 }
397 // Clear the internal caches
398 m_objects.clear();
399 m_objects.shrink_to_fit();
400 m_ownedObjects.clear();
401 m_altObjects.clear();
402 return(StatusCode::SUCCESS);
403}
404
405// Execute data writer
406StatusCode AthenaOutputStream::execute(const EventContext& ctx) {
407 bool failed = false;
408 // Call tool preExecute prior to writing
409 for (auto& tool : m_helperTools) {
410 if (!tool->preExecute().isSuccess()) {
411 failed = true;
412 }
413 }
414 // Write the event if the event is accepted
415 if (isEventAccepted()) {
416 if (write(ctx).isFailure()) {
417 failed = true;
418 }
419 }
420 // Call tool postExecute after writing
421 for (auto& tool : m_helperTools) {
422 if(!tool->postExecute().isSuccess()) {
423 failed = true;
424 }
425 }
426 // See if we should write metadata and do if so
428 writeMetaData(ctx);
430 // finalize will disconnect output
431 if( !finalize().isSuccess() ) {
432 failed = true;
433 }
434 }
435 if (failed) {
436 return(StatusCode::FAILURE);
437 }
438 return(StatusCode::SUCCESS);
439}
440
441// The main method that performs the writing
442StatusCode AthenaOutputStream::write(const EventContext& ctx) {
443 bool failed = false;
445 std::string outputFN;
446
447 std::unique_lock<mutex_t> lock(m_mutex);
448 outputFN = m_outSeqSvc->buildSequenceFileName( ctx, m_outputName );
449
450 // Handle Event Ranges
451 if( m_outSeqSvc->inUse() and m_outSeqSvc->inConcurrentEventsMode() ) {
452 ATH_MSG_DEBUG(std::format("Writing event sequence to {}", outputFN));
453 streamer = m_streamerMap[ outputFN ].get();
454 if( !streamer ) {
455 // new range, needs a new streamer tool
456 IAlgTool* st = AlgTool::Factory::create( m_streamer->type(), m_streamer->type(), m_streamer->name(), this ).release();
457 st->addRef();
458 streamer = dynamic_cast<IAthenaOutputStreamTool*>( st );
459 IProperty *mstreamer_props = dynamic_cast<IProperty*> (&*m_streamer);
460 IProperty *streamer_props = dynamic_cast<IProperty*> (&*streamer);
461 if (!mstreamer_props || !streamer_props) {
462 ATH_MSG_FATAL("Cannot cast streamer to IProperty");
463 return StatusCode::FAILURE;
464 }
465 for ( const auto& prop : mstreamer_props->getProperties() ) {
466 ATH_CHECK( streamer_props->setProperty( *prop ) );
467 }
468 if( !streamer or streamer->initialize().isFailure()
469 or streamer->connectServices(m_dataStore.typeAndName(), m_persName, m_extendProvenanceRecord).isFailure() ) {
470 ATH_MSG_FATAL(std::format("Unable to initialize OutputStreamTool for {}", outputFN));
471 return StatusCode::FAILURE;
472 }
473 m_streamerMap[ outputFN ].reset( streamer );
474 }
475 }
476
477 // Clear any previously existing item list
478 // and collect all objects that are asked to be written out
481
482 // keep a local copy of the object lists so they are not overwritten when we release the lock
483 IDataSelector objects = std::move( m_objects );
484 IDataSelector altObjects = std::move( m_altObjects );
485 std::vector<std::unique_ptr<DataObject> > ownedObjects = std::move( m_ownedObjects );
486
487 // prepare before releasing lock because m_outputAttributes change in metadataStop
488 const std::string connectStr = outputFN + m_outputAttributes;
489
490 for (auto& tool : m_helperTools) {
491 ATH_CHECK( tool->preStream() );
492 }
493
494 // MN: would be nice to release the Stream lock here
495 // lock.unlock();
496
497 // Connect the output file to the service
498 if (!streamer->connectOutput(connectStr).isSuccess()) {
499 ATH_MSG_FATAL("Could not connectOutput");
500 return StatusCode::FAILURE;
501 }
502 ATH_MSG_DEBUG(std::format("connectOutput done for {}", outputFN));
503 StatusCode currentStatus = streamer->streamObjects(objects, connectStr);
504 // Do final check of streaming
505 if (!currentStatus.isSuccess()) {
506 if (!currentStatus.isRecoverable()) {
507 ATH_MSG_FATAL("streamObjects failed.");
508 failed = true;
509 } else {
510 ATH_MSG_DEBUG("streamObjects failed.");
511 }
512 }
513 bool doCommit = false;
514 if (!streamer->commitOutput(doCommit).isSuccess()) {
515 ATH_MSG_FATAL("commitOutput failed.");
516 failed = true;
517 }
518 if (failed) {
519 return(StatusCode::FAILURE);
520 }
521 m_events++;
522 return(StatusCode::SUCCESS);
523}
524
525// Clear collected object list
527 m_objects.clear();
528 m_ownedObjects.clear();
529 m_altObjects.clear();
530}
531
532// Collect objects
533StatusCode AthenaOutputStream::collectAllObjects(const EventContext& ctx) {
534 if (m_itemListFromTool) {
535 if (!m_streamer->getInputItemList(&*m_p2BWritten).isSuccess()) {
536 ATH_MSG_WARNING("collectAllObjects() could not get ItemList from Tool.");
537 }
538 }
539
540 // This holds the vetoes for the AuxID selection
541 auto vetoes = std::make_unique<SG::SelectionVetoes>();
542 // This holds the lossy float compression information
543 auto compInfo = std::make_unique<SG::CompressionInfo>();
544
545 m_p2BWritten->updateItemList(true);
546 // Collect all objects that need to be persistified:
547 for (const auto& i : *m_p2BWritten) {
548 ATH_CHECK( addItemObjects(ctx, i, *vetoes, *compInfo) );
549 }
550
551 // If there were any variable selections, record the information in SG.
552 if (!vetoes->empty()) {
553 ATH_CHECK( SG::makeHandle (m_selVetoesKey, ctx).record (std::move (vetoes)) );
554 }
555
556 // Store the lossy float compression information in the SG.
557 if (!compInfo->empty()) {
558 ATH_CHECK( SG::makeHandle (m_compInfoKey, ctx).record (std::move (compInfo)) );
559 }
560
561 return StatusCode::SUCCESS;
562}
563
564// Build a list of objects we're going to write out
565// This function also builds the list of vetoed AuxIDs
566// and the lossy float compression lists.
567StatusCode AthenaOutputStream::addItemObjects(const EventContext& ctx,
568 const SG::FolderItem& item,
569 SG::SelectionVetoes& vetoes,
570 SG::CompressionInfo& compInfo)
571{
572 // anything after a dot is a list of dynamic Aux attributes, separated by dots
573 size_t dotpos = item.key().find('.');
574 std::string item_key, aux_attr;
575 if( dotpos != std::string::npos ) {
576 item_key = item.key().substr(0, dotpos+1);
577 aux_attr = item.key().substr(dotpos+1);
578 } else {
579 item_key = item.key();
580 }
581 CLID item_id = item.id();
582 ATH_MSG_DEBUG(std::format("addItemObjects({},\"{}\") called", item_id, item_key));
583 ATH_MSG_DEBUG(std::format(" Key:{}", item_key));
584 if( aux_attr.size() ) {
585 ATH_MSG_DEBUG(std::format(" Aux Attr:{}", aux_attr));
586 }
587
588 // Here we build the list of attributes for the lossy float compression
589 // Note that we do not allow m_compressionBitsHigh >= m_compressionBitsLow
590 // Otherwise is, in any case, a logical error and they'd potentially overwrite each other
591 std::map< unsigned int, std::set< std::string > > comp_attr_map;
592 comp_attr_map[ m_compressionBitsHigh ] = buildCompressionSet( m_compressionDecoderHigh, item_id, item_key );
593 comp_attr_map[ m_compressionBitsLow ] = buildCompressionSet( m_compressionDecoderLow, item_id, item_key );
594
595 // Print some debugging information regarding the lossy float compression configuration
596 for( const auto& it : comp_attr_map ) {
597 ATH_MSG_DEBUG(std::format(" Comp Attr {} with {} mantissa bits.", it.second.size(), it.first));
598 if ( it.second.size() > 0 ) {
599 for( const auto& attr : it.second ) {
600 ATH_MSG_DEBUG(std::format(" >> {}", attr));
601 }
602 }
603 }
604
605 // For MetaData objects of type T that are kept in MetaContainers get the MetaCont<T> ID
606 const CLID remapped_item_id = m_metaDataSvc->remapMetaContCLID( item_id );
607 SG::ConstProxyIterator iter, end;
609 bool gotProxies = false;
610 // Look for the clid in storegate
611 SG::DataProxy* match = (*m_currentStore)->proxy(remapped_item_id, item_key, true);
612 if (match != nullptr) {
613 map.insert({item_key, match});
614 iter = map.begin();
615 end = map.end();
616 gotProxies = true;
617 }
618 // Look for the clid in storegate
619 if (!gotProxies && ((*m_currentStore)->proxyRange(remapped_item_id, iter, end)).isSuccess()) {
620 gotProxies = true;
621 }
622 if (gotProxies) {
623 bool added = false, removed = false;
624 // Now loop over any found proxies
625 for (; iter != end; ++iter) {
626 SG::DataProxy* itemProxy(iter->second);
627 std::string proxyName = itemProxy->name();
628 std::string stream;
630 // only check metadata keys
631 stream = m_metaDataSvc->removeStreamFromKey(proxyName); // can modify proxyName
632 }
633 // Does this key match the proxy key name - allow for wildcarding and aliases
634 bool keyMatch = ( item_key == "*" ||
635 item_key == proxyName ||
636 itemProxy->hasAlias(item_key) );
637 if (!keyMatch) {
638 // For item list we currently allow wildcards ('*'), which has limited use, e.g.:
639 // xAOD::CutBookkeeperAuxContainer#IncompleteCutBookkeepers*Aux.
640 // Here we look for those few cases...
641 keyMatch = simpleMatch(item_key, proxyName);
642 ATH_MSG_DEBUG(std::format("Result of checking {} against {} to see if it matches is {}",
643 proxyName, item_key, keyMatch));
644 }
645
646 // Now check if this item is marked for another output stream, if so we reject it
647 // We also reject keys that are marked transient at this point
648 bool xkeyMatch = false;
649 if( (!stream.empty() and stream != m_outputName) || SG::isTransientKey(proxyName) ) {
650 // reject keys that are marked for a different output stream
651 ATH_MSG_DEBUG(std::format("Rejecting key: {} in output: {}", itemProxy->name(), m_outputName.toString()));
652 xkeyMatch = true;
653 }
654
655 // All right, it passes key match find in itemList, but not in excludeList
656 if (keyMatch && !xkeyMatch) {
657 if (m_forceRead && itemProxy->isValid()) {
658 if (nullptr == itemProxy->accessData()) {
659 ATH_MSG_ERROR(std::format(" Could not get data object for id {},\"{}\"", remapped_item_id, proxyName));
660 }
661 }
662 if (nullptr != itemProxy->object()) {
663 if( std::find(m_objects.begin(), m_objects.end(), itemProxy->object()) == m_objects.end() &&
664 std::find(m_altObjects.begin(), m_altObjects.end(), itemProxy->object()) == m_altObjects.end() )
665 {
666 if( item_id != remapped_item_id ) {
667 // For MetaCont<T>: -
668 // create a temporary DataObject for an entry in the container to pass to CnvSvc
669 DataBucketBase* dbb = static_cast<DataBucketBase*>( itemProxy->object() );
670 const MetaContBase* metaCont = static_cast<MetaContBase*>( dbb->cast( ClassID_traits<MetaContBase>::ID() ) );
671 void* obj = metaCont? metaCont->getAsVoid( m_outSeqSvc->currentRangeID(ctx) ) : nullptr;
672 if( obj ) {
673 auto altbucket = std::make_unique<AltDataBucket>(
674 obj, item_id, *CLIDRegistry::CLIDToTypeinfo(item_id), proxyName );
675 m_objects.push_back( altbucket.get() );
676 m_ownedObjects.push_back( std::move(altbucket) );
677 m_altObjects.push_back( itemProxy->object() ); // only for duplicate prevention
678 } else {
679 ATH_MSG_ERROR(std::format("Failed to retrieve object from MetaCont with key={}, for EventRangeID={}",
680 item_key, m_outSeqSvc->currentRangeID(ctx)));
681 return StatusCode::FAILURE;
682 }
683 } else if (item.exact()) {
684 // If the exact flag is set, make a new DataObject
685 // holding the object as the requested type.
686 DataBucketBase* dbb = dynamic_cast<DataBucketBase*> (itemProxy->object());
687 if (!dbb) std::abort();
688 void* ptr = dbb->cast (item_id);
689 if (!ptr) {
690 // Hard cast
691 ptr = dbb->object();
692 }
693 auto altbucket =
694 std::make_unique<AltDataBucket>
695 (ptr, item_id,
697 *itemProxy);
698 m_objects.push_back(altbucket.get());
699 m_ownedObjects.push_back (std::move(altbucket));
700 m_altObjects.push_back (itemProxy->object());
701 }
702 else
703 m_objects.push_back(itemProxy->object());
704 ATH_MSG_DEBUG(std::format(" Added object {},\"{}\"", item_id, proxyName));
705 }
706
710 if ((*m_currentStore)->storeID() == StoreID::EVENT_STORE &&
711 item_key.find( RootAuxDynIO::AUX_POSTFIX ) == ( item_key.size() - 4 )) {
712
713 const SG::IConstAuxStore* auxstore( nullptr );
714 try {
715 SG::fromStorable( itemProxy->object(), auxstore, true );
716 } catch( const std::exception& ) {
717 ATH_MSG_DEBUG(std::format("Error in casting object with CLID {} to SG::IConstAuxStore*", itemProxy->clID()));
718 auxstore = nullptr;
719 }
720
721 if (auxstore) {
722 handleVariableSelection (*auxstore, *itemProxy,
723 aux_attr, vetoes);
724
725 // Here comes the compression logic using ThinningInfo
726 // Get a hold of all AuxIDs for this store (static, dynamic etc.)
727 const SG::auxid_set_t allVars = auxstore->getAuxIDs();
728
729 // Get a handle on the compression information for this store
730 std::string key = item_key;
731 key.erase (key.size()-4, 4);
732
733 // Build the compression list, retrieve the relevant AuxIDs and
734 // store it in the relevant map that is going to be inserted into
735 // the ThinningCache later on by the ThinningCacheTool
736 xAOD::AuxCompression compression;
737 compression.setCompressedAuxIDs( comp_attr_map );
738 for( const auto& it : compression.getCompressedAuxIDs( allVars ) ) {
739 if( it.second.size() > 0 ) { // insert only if the set is non-empty
740 compInfo[ key ][ it.first ] = it.second;
741 ATH_MSG_DEBUG(std::format("Container {} has {} variables that'll be "
742 "lossy float compressed with {} mantissa bits",
743 key, it.second.size(), it.first));
744 }
745 } // End of loop over variables to be lossy float compressed
746 } // End of lossy float compression logic
747
748 }
749
750 added = true;
751 }
752 } else if (keyMatch && xkeyMatch) {
753 removed = true;
754 }
755 } // proxy loop
756 if (!added && !removed) {
757 ATH_MSG_DEBUG(std::format(" No object matching {},\"{}\" found", item_id, item_key));
758 } else if (removed) {
759 ATH_MSG_DEBUG(std::format(" Object being excluded based on property setting {},\"{}\". Skipping",
760 item_id, item_key));
761 }
762 } else {
763 ATH_MSG_DEBUG(std::format(" Failed to receive proxy iterators from StoreGate for {},\"{}\". Skipping",
764 item_id, item_key));
765 }
766 return StatusCode::SUCCESS;
767}
768
774std::set<std::string>
775AthenaOutputStream::buildCompressionSet (const ToolHandle<SG::IFolder>& handle,
776 const CLID& item_id,
777 const std::string& item_key) const
778{
779 // Create an empty result
780 std::set<std::string> result;
781
782 // Check the item is indeed Aux.
783 if(item_key.find("Aux.") == std::string::npos) {
784 return result;
785 }
786
787 // First the high compression list
788 for (const auto& iter : *handle) {
789 // First match the IDs for early rejection.
790 if (iter.id() != item_id) {
791 continue;
792 }
793 // Then find the compression item key and the compression list string
794 size_t seppos = iter.key().find('.');
795 std::string comp_item_key{""}, comp_str{""};
796 if(seppos != std::string::npos) {
797 comp_item_key = iter.key().substr(0, seppos+1);
798 comp_str = iter.key().substr(seppos+1);
799 } else {
800 comp_item_key = iter.key();
801 }
802 // Proceed only if the keys match and the
803 // compression list string is not empty
804 if (!comp_str.empty() && comp_item_key == item_key) {
805 std::stringstream ss(comp_str);
806 std::string attr;
807 while( std::getline(ss, attr, '.') ) {
808 result.insert(attr);
809 }
810 }
811 }
812
813 // All done, return the result
814 return result;
815}
816
818void AthenaOutputStream::handleVariableSelection (const SG::IConstAuxStore& auxstore,
819 SG::DataProxy& itemProxy,
820 const std::string& aux_attr,
821 SG::SelectionVetoes& vetoes) const
822{
823 // Collect dynamic Aux selection (parse the line, attributes separated by dot)
824 std::set<std::string> attributes;
825 if( aux_attr.size() ) {
826 std::stringstream ss(aux_attr);
827 std::string attr;
828 while( std::getline(ss, attr, '.') ) {
829 attributes.insert(attr);
830 }
831 }
832
833 // Return early if there's no selection.
834 if (attributes.empty()) {
835 return;
836 }
837
838 std::string key = itemProxy.name();
839 if (key.size() >= 4 && key.compare (key.size()-4, 4, "Aux.")==0)
840 {
841 key.erase (key.size()-4, 4);
842 }
843
844 // Find the entry for the selection.
845 SG::auxid_set_t& vset = vetoes[key];
846
847 // Form the veto mask for this object.
849 sel.selectAux (attributes);
850
851 // Get all the AuxIDs that we know of and the selected ones
852 SG::auxid_set_t all = auxstore.getAuxIDs();
853 SG::auxid_set_t selected = sel.getSelectedAuxIDs( all );
854
855 // Loop over all and build a list of vetoed AuxIDs from non selected ones
856 for( const SG::auxid_t auxid : all ) {
857 if ( !selected.test( auxid ) ) {
858 vset.insert( auxid );
859 }
860 }
861}
862
863// Here comes the list handlers...
864void AthenaOutputStream::itemListHandler(Gaudi::Details::PropertyBase& /* theProp */) {
865 // Assuming concrete SG::Folder also has an itemList property
866 IProperty *pAsIProp(nullptr);
867 if ((m_p2BWritten.retrieve()).isFailure() ||
868 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_p2BWritten)) ||
869 (pAsIProp->setProperty(m_itemList)).isFailure()) {
870 throw GaudiException("Folder property [itemList] not found", name(), StatusCode::FAILURE);
871 }
872}
873
874void AthenaOutputStream::compressionListHandlerHigh(Gaudi::Details::PropertyBase& /* theProp */) {
875 IProperty *pAsIProp(nullptr);
876 if ((m_compressionDecoderHigh.retrieve()).isFailure() ||
877 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderHigh)) ||
878 (pAsIProp->setProperty("ItemList", m_compressionListHigh.toString())).isFailure()) {
879 throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
880 }
881}
882
883void AthenaOutputStream::compressionListHandlerLow(Gaudi::Details::PropertyBase& /* theProp */) {
884 IProperty *pAsIProp(nullptr);
885 if ((m_compressionDecoderLow.retrieve()).isFailure() ||
886 nullptr == (pAsIProp = dynamic_cast<IProperty*>(&*m_compressionDecoderLow)) ||
887 (pAsIProp->setProperty("ItemList", m_compressionListLow.toString())).isFailure()) {
888 throw GaudiException("Folder property [ItemList] not found", name(), StatusCode::FAILURE);
889 }
890}
891
892// Reinitialize the internal state of the algorithm
893// for I/O purposes (e.g. upon @c fork(2))
895 ATH_MSG_INFO("I/O reinitialization...");
896 m_incidentSvc->removeListener(this, "MetaDataStop"); // Remove any existing listener to avoid handling the incident multiple times
897 m_incidentSvc->addListener(this, "MetaDataStop", 50);
898 for (auto& tool : m_helperTools) {
899 if (!tool->postInitialize().isSuccess()) {
900 ATH_MSG_ERROR("Cannot initialize helper tool");
901 }
902 }
903 return StatusCode::SUCCESS;
904}
905
906// Finalize the internal state of the algorithm
907// for I/O purposes (e.g. upon @c fork(2))
909 ATH_MSG_INFO("I/O finalization...");
910 for (auto& tool : m_helperTools) {
911 if (!tool->preFinalize().isSuccess()) {
912 ATH_MSG_ERROR("Cannot finalize helper tool");
913 }
914 }
915 const Incident metaDataStopIncident(name(), "MetaDataStop");
916 this->handle(metaDataStopIncident);
917 m_incidentSvc->removeListener(this, "MetaDataStop");
918 if (m_dataStore->clearStore().isFailure()) {
919 ATH_MSG_WARNING("Cannot clear the DataStore");
920 }
921 return StatusCode::SUCCESS;
922}
923
930{
931 m_dictLoader->load_type (clid);
932
933 // Also load the persistent class dictionary, if applicable.
934 std::unique_ptr<ITPCnvBase> tpcnv = m_tpCnvSvc->t2p_cnv_unique (clid);
935 if (tpcnv) {
936 m_dictLoader->load_type (tpcnv->persistentTInfo());
937 }
938}
939
941bool AthenaOutputStream::simpleMatch(const std::string& pattern,
942 const std::string& text) {
943 size_t pi = 0, ti = 0, star = std::string::npos, match = 0;
944 while (ti < text.size()) {
945 if (pi < pattern.size() && (pattern[pi] == text[ti] || pattern[pi] == '*')) {
946 if (pattern[pi] == '*') { star = pi++; match = ti; }
947 else { ++pi; ++ti; }
948 } else if (star != std::string::npos) {
949 pi = star + 1;
950 ti = ++match;
951 } else return false;
952 }
953 while (pi < pattern.size() && pattern[pi] == '*') ++pi;
954 return pi == pattern.size();
955}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
a static registry of CLID->typeName entries.
This file contains the class definition for the DataHeader and DataHeaderElement classes.
uint32_t CLID
The Class ID type.
Interface providing I/O for a generic auxiliary store.
Interface for non-const operations on an auxiliary store.
Interface for const operations on an auxiliary store.
virtual void lock()=0
Interface to allow an object to lock itself when made const in SG.
static Double_t ss
Handle class for recording to StoreGate.
#define pi
IDataSelector m_objects
Collection of objects being selected.
ServiceHandle< StoreGateSvc > m_metadataStore
virtual StatusCode finalize() override
StringProperty m_streamName
Stream name (defaults to algorithm name).
UnsignedIntegerProperty m_compressionBitsLow
Number of mantissa bits in the float compression.
UnsignedIntegerProperty m_compressionBitsHigh
Number of mantissa bits in the float compression.
StringArrayProperty m_transientItems
List of items that are known to be present in the transient store (and hence we can make input depend...
ToolHandle< SG::IFolder > m_compressionDecoderLow
The top-level folder with items to be compressed low.
ToolHandle< SG::IFolder > m_p2BWritten
The top-level folder with items to be written.
StringArrayProperty m_compressionListLow
Vector of item names.
ToolHandle< IAthenaOutputStreamTool > m_streamer
pointer to AthenaOutputStreamTool
ServiceHandle< MetaDataSvc > m_metaDataSvc
Handles to all the necessary services.
BooleanProperty m_itemListFromTool
Set to write out everything in input DataHeader.
StringProperty m_outputName
Name of the output file.
virtual ~AthenaOutputStream()
Standard Destructor.
std::map< std::string, std::unique_ptr< IAthenaOutputStreamTool > > m_streamerMap
map of streamerTools handling event ranges in MT
std::map< unsigned, std::string > m_slotRangeMap
map of filenames assigned to active slots
virtual void handle(const Incident &incident) override
Incident service handle listening for MetaDataStop.
std::set< std::string > buildCompressionSet(const ToolHandle< SG::IFolder > &handle, const CLID &item_id, const std::string &item_key) const
Helper function for building the compression lists.
AthenaOutputStream(const std::string &name, ISvcLocator *pSvcLocator)
Standard algorithm Constructor.
ToolHandleArray< IAthenaOutputTool > m_helperTools
vector of AlgTools that that are executed by this stream
ToolHandle< SG::IFolder > m_compressionDecoderHigh
The top-level folder with items to be compressed high.
void clearSelection()
Clear list of selected objects.
StringArrayProperty m_itemList
Vector of item names.
ServiceHandle< IClassIDSvc > m_pCLIDSvc
ServiceHandle< IIncidentSvc > m_incidentSvc
void handleVariableSelection(const SG::IConstAuxStore &auxstore, SG::DataProxy &itemProxy, const std::string &aux_attr, SG::SelectionVetoes &vetoes) const
Here we build the vetoed AuxIDs.
ServiceHandle< IDictLoaderSvc > m_dictLoader
StatusCode addItemObjects(const EventContext &, const SG::FolderItem &, SG::SelectionVetoes &vetoes, SG::CompressionInfo &compInfo)
Add item data objects to output streamer list.
void finalizeRange(const EventContext &ctx, const std::string &rangeFN)
IDataSelector m_altObjects
Objects overridden by `exact' handling.
void compressionListHandlerLow(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
StringProperty m_persName
Name of the persistency service capable to write data from the store.
BooleanProperty m_forceRead
set to true to force read of data objects in item list
void loadDict(CLID clid)
Helper function to load dictionaries (both transient and persistent) for a given type.
virtual StatusCode execute(const EventContext &ctx) override
ServiceHandle< ITPCnvSvc > m_tpCnvSvc
StatusCode collectAllObjects(const EventContext &ctx)
Collect data objects for output streamer list.
std::string m_outputAttributes
Output attributes.
StringArrayProperty m_compressionListHigh
Vector of item names.
ServiceHandle< StoreGateSvc > * m_currentStore
StringArrayProperty m_metadataItemList
Vector of item names.
std::vector< std::unique_ptr< DataObject > > m_ownedObjects
Collection of DataObject instances owned by this service.
ServiceHandle< OutputStreamSequencerSvc > m_outSeqSvc
std::map< std::string, std::string > m_rangeIDforRangeFN
map of RangeIDs (as used by the Sequencer) for each Range filename generated
ToolHandle< SG::IFolder > m_transient
Decoded list of transient ids.
ServiceHandle< StoreGateSvc > m_dataStore
Handle to the StoreGateSvc store where the data we want to write out resides.
virtual StatusCode initialize() override
mutex_t m_mutex
mutex for this Stream write() and handle() methods
virtual StatusCode io_reinit() override
Callback method to reinitialize the internal state of the component for I/O purposes (e....
SG::WriteHandleKey< SG::CompressionInfo > m_compInfoKey
Key used for recording lossy float compressed variable information to the event store.
bool simpleMatch(const std::string &pattern, const std::string &text)
Glob-style matcher, where the only meta-character is '*'.
void compressionListHandlerHigh(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
SG::WriteHandleKey< SG::SelectionVetoes > m_selVetoesKey
Key used for recording selected dynamic variable information to the event store.
virtual StatusCode io_finalize() override
BooleanProperty m_extendProvenanceRecord
Set to false to omit adding the current DataHeader into the DataHeader history This will cause the in...
virtual StatusCode write(const EventContext &ctx)
Stream the data.
std::atomic< int > m_events
Number of events written to this output stream.
void writeMetaData(const EventContext &ctx, const std::string &outputFN="")
Write MetaData for this stream (by default) or for a substream outputFN (in ES mode).
void itemListHandler(Gaudi::Details::PropertyBase &)
Handler for ItemNames Property.
static const std::type_info * CLIDToTypeinfo(CLID clid)
Translate between CLID and type_info.
A non-templated base class for DataBucket, allows to access the transient object address as a void*.
virtual void * object()=0
T * cast(SG::IRegisterTransient *irt=0, bool isConst=true)
Return the contents of the DataBucket, converted to type T.
virtual StatusCode initialize()
This is a tool that allows streaming out of DataObjects.
virtual StatusCode connectServices(const std::string &dataStore, const std::string &cnvSvc, bool extendProvenenceRecord=false)=0
Specify which data store and conversion service to use and whether to extend provenence Only use if o...
virtual StatusCode streamObjects(const TypeKeyPairs &typeKeys, const std::string &outputName="")=0
virtual StatusCode connectOutput(const std::string &outputName="")=0
Connect to the output stream Must connectOutput BEFORE streaming Only specify "outputName" if one wan...
virtual StatusCode commitOutput(bool doCommit=false)=0
Commit the output stream after having streamed out objects Must commitOutput AFTER streaming.
virtual void * getAsVoid(const SourceID &sid) const =0
CLID clID() const
Retrieve clid.
virtual const name_type & name() const override final
Retrieve data object key == string.
bool isValid() const
called by destructor
DataObject * accessData()
Access DataObject on-demand using conversion service.
bool hasAlias(const std::string &key) const
Test to see if a given string is in the alias set.
a Folder item (data object) is identified by the clid/key pair
CLID id() const
const std::string & key() const
bool exact() const
A set of aux data identifiers.
Definition AuxTypes.h:47
@ EVENT_STORE
Definition StoreID.h:26
STL class.
virtual std::map< unsigned int, SG::auxid_set_t > getCompressedAuxIDs(const SG::auxid_set_t &fullset) const
Return those variables that are selected to be compressed per compression setting.
virtual void setCompressedAuxIDs(const std::map< unsigned int, std::set< std::string > > &attributes)
Set which variables should be compressed per compression setting.
Class helping in dealing with dynamic branch selection.
bool match(std::string s1, std::string s2)
match the individual directories of two strings
Definition hcg.cxx:359
constexpr char AUX_POSTFIX[]
Common post-fix for the names of auxiliary containers in StoreGate.
std::map< std::string, DataProxy * > ProxyMap
Definition ProxyMap.h:22
bool fromStorable(DataObject *pDObj, T *&pTrans, bool quiet=false, IRegisterTransient *irt=0, bool isConst=true)
ProxyMap::const_iterator ConstProxyIterator
Definition ProxyMap.h:24
std::unordered_map< std::string, SG::ThinningInfo::compression_map_t > CompressionInfo
Map of compressed variables and their compression levels.
SG::ReadCondHandle< T > makeHandle(const SG::ReadCondHandleKey< T > &key, const EventContext &ctx=Gaudi::Hive::currentContext())
size_t auxid_t
Identifier for a particular aux data item.
Definition AuxTypes.h:27
STL namespace.