ATLAS Offline Software
Loading...
Searching...
No Matches
OutputStreamSequencerSvc Class Reference

This class provides configuration properties to enable OutputStream file sequences. More...

#include <OutputStreamSequencerSvc.h>

Inheritance diagram for OutputStreamSequencerSvc:
Collaboration diagram for OutputStreamSequencerSvc:

Public Types

typedef std::pair< std::string, std::string > RangeReport_t
typedef std::unique_ptr< RangeReport_tRangeReport_ptr

Public Member Functions

 OutputStreamSequencerSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~OutputStreamSequencerSvc ()
 Destructor.
virtual StatusCode initialize () override final
 Required of all Gaudi services:
virtual StatusCode finalize () override final
 Required of all Gaudi services:
virtual void handle (const Incident &) override final
 Incident service handle.
std::string buildSequenceFileName (const std::string &)
 Returns sequenced file name for output stream.
void publishRangeReport (const std::string &outputFile)
RangeReport_ptr getRangeReport ()
std::string incidentName () const
 The name of the incident that starts a new event sequence.
std::string currentRangeID () const
 The current Event Range ID (only one range is returned)
std::string setRangeID (const std::string &rangeID)
 set the RangeID (possibly temporarily) so the right Range Filename may be generated
bool inUse () const
 Is the service in active use? (true after the first range incident is handled)
const std::string & lastIncident ()
 Last incident type that was handled.

Static Public Member Functions

static bool inConcurrentEventsMode ()
 Are there concurrent events? (threads>1)

Private Attributes

ServiceHandle< MetaDataSvcm_metaDataSvc
int m_fileSequenceNumber {}
 The event sequence number.
std::string m_currentRangeID
 Current EventRange ID constructed on the last NextRange incident.
std::string m_lastFileName
 Recently constructed full file name (useful in single threaded processing)
std::string m_lastIncident
 Last incident type that was handled.
std::vector< std::string > m_rangeIDinSlot
 EventRange ID for all slots.
StringProperty m_incidentName
 SequenceIncidentName, incident name for triggering file sequencing.
BooleanProperty m_reportingOn
 Flag to switch on storage of reporting info in fnToRangeId.
BooleanProperty m_replaceRangeMode
 Flag to put in ReplaceRangeMode (i.e.
std::map< std::string, std::string > m_fnToRangeId
std::map< std::string, std::string >::iterator m_finishedRange {}
std::mutex m_mutex

Detailed Description

This class provides configuration properties to enable OutputStream file sequences.

Definition at line 29 of file OutputStreamSequencerSvc.h.

Member Typedef Documentation

◆ RangeReport_ptr

Definition at line 35 of file OutputStreamSequencerSvc.h.

◆ RangeReport_t

typedef std::pair<std::string,std::string> OutputStreamSequencerSvc::RangeReport_t

Definition at line 34 of file OutputStreamSequencerSvc.h.

Constructor & Destructor Documentation

◆ OutputStreamSequencerSvc()

OutputStreamSequencerSvc::OutputStreamSequencerSvc ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard Service Constructor.

Definition at line 24 of file OutputStreamSequencerSvc.cxx.

25 : base_class(name, pSvcLocator),
26 m_metaDataSvc("MetaDataSvc", name),
28{
29}
ServiceHandle< MetaDataSvc > m_metaDataSvc
int m_fileSequenceNumber
The event sequence number.

◆ ~OutputStreamSequencerSvc()

OutputStreamSequencerSvc::~OutputStreamSequencerSvc ( )
virtual

Destructor.

Definition at line 32 of file OutputStreamSequencerSvc.cxx.

32 {
33}

Member Function Documentation

◆ buildSequenceFileName()

std::string OutputStreamSequencerSvc::buildSequenceFileName ( const std::string & orgFileName)

Returns sequenced file name for output stream.

Definition at line 176 of file OutputStreamSequencerSvc.cxx.

177{
178 if( !inUse() ) {
179 // Event sequences not in use, just return the original filename
180 return orgFileName;
181 }
182 std::string rangeID = currentRangeID();
183 std::lock_guard lockg( m_mutex );
184 if (!m_replaceRangeMode) {
185 // build the full output file name for this event range
186 std::string fileNameCore = orgFileName, fileNameExt;
187 std::size_t sepPos = orgFileName.find('[');
188 if (sepPos != std::string::npos) {
189 fileNameCore = orgFileName.substr(0, sepPos);
190 fileNameExt = orgFileName.substr(sepPos);
191 }
192 std::ostringstream n;
193 n << fileNameCore << "." << rangeID << fileNameExt;
194 m_lastFileName = n.str();
195 } else {
196 std::string_view origFileNameView = orgFileName;
197 std::size_t open = origFileNameView.find('[');
198 std::size_t close = origFileNameView.find(']');
199 // If we don't find a [ ] enclosed section, just append the rangeID to the
200 // end
201 if (open == std::string_view::npos || close == std::string_view::npos) {
202 m_lastFileName = std::format("{}.{}", origFileNameView, rangeID);
203 } else {
204 // build list of elems to substitute from
205 ATH_MSG_DEBUG("Building element list");
206 std::vector<std::string_view> elems{};
207 std::size_t pos = open + 1;
208 for (std::size_t comma = origFileNameView.find(',', pos);
209 comma < close;
210 comma = origFileNameView.find(',', pos)) {
211 std::string_view item = origFileNameView.substr(pos, comma - pos);
212 ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) comma = " << comma << ", item = " << item);
213 elems.push_back(item);
214 pos = comma + 1;
215 }
216 std::string_view last_item = origFileNameView.substr(pos, close - pos);
217 ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) close = " << close << ", item = " << last_item);
218 elems.push_back(last_item);
219 // substitute
220 std::size_t rangeIdx{};
221 auto rangeIdxParseRes = std::from_chars(
222 rangeID.data(), rangeID.data() + rangeID.size(), rangeIdx);
223 if (rangeIdxParseRes.ec != std::errc()) {
225 "Error parsing rangeID to integer. Replacing [] list with "
226 "rangeID.");
228 std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
229 origFileNameView.substr(close + 1));
230 } else if (rangeIdx >= elems.size()) {
232 "Number of elements in [] list <= rangeID. Replacing [] list with "
233 "rangeID.");
235 std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
236 origFileNameView.substr(close + 1));
237 } else {
238 m_lastFileName = std::format(
239 "{}{}{}", origFileNameView.substr(0, open), elems.at(rangeIdx),
240 origFileNameView.substr(close + 1));
241 ATH_MSG_DEBUG("Output file: " << m_lastFileName);
242 }
243 }
244 }
245
246 if( m_reportingOn.value() ) {
247 m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
248 }
249
250 return m_lastFileName;
251}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
bool inUse() const
Is the service in active use? (true after the first range incident is handled)
std::string currentRangeID() const
The current Event Range ID (only one range is returned)
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing)
BooleanProperty m_replaceRangeMode
Flag to put in ReplaceRangeMode (i.e.
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
std::map< std::string, std::string > m_fnToRangeId
@ open
Definition BinningType.h:40

◆ currentRangeID()

std::string OutputStreamSequencerSvc::currentRangeID ( ) const

The current Event Range ID (only one range is returned)

Definition at line 254 of file OutputStreamSequencerSvc.cxx.

255{
256 if( !inUse() ) return "";
257 auto slot = Gaudi::Hive::currentContext().slot();
258 if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
259 std::lock_guard lockg( m_mutex );
260 if( slot >= m_rangeIDinSlot.size() ) return "";
261 return m_rangeIDinSlot[ slot ];
262}
std::vector< std::string > m_rangeIDinSlot
EventRange ID for all slots.

◆ finalize()

StatusCode OutputStreamSequencerSvc::finalize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 70 of file OutputStreamSequencerSvc.cxx.

70 {
71 // Release MetaDataSvc
72 if (!m_metaDataSvc.release().isSuccess()) {
73 ATH_MSG_WARNING("Cannot release MetaDataSvc.");
74 }
75 return(StatusCode::SUCCESS);
76}

◆ getRangeReport()

OutputStreamSequencerSvc::RangeReport_ptr OutputStreamSequencerSvc::getRangeReport ( )

Definition at line 285 of file OutputStreamSequencerSvc.cxx.

286{
288 if( !m_reportingOn.value() ) {
289 ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
290 } else {
291 std::lock_guard lockg( m_mutex );
292 if(m_finishedRange!=m_fnToRangeId.end()) {
293 report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
296 }
297 }
298 return report;
299}
std::unique_ptr< RangeReport_t > RangeReport_ptr
std::map< std::string, std::string >::iterator m_finishedRange
list report
Definition checkTP.py:125

◆ handle()

void OutputStreamSequencerSvc::handle ( const Incident & inc)
finaloverridevirtual

Incident service handle.

Definition at line 90 of file OutputStreamSequencerSvc.cxx.

91{
92 auto slot = Gaudi::Hive::currentContext().slot();
93 bool has_context = ( slot != EventContext::INVALID_CONTEXT_ID );
94 // in AthenaSP there is no context so go with the first slot
95 if( !has_context ) slot = 0;
96 m_lastIncident = inc.type();
97 ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for slot=" << slot
98 << (!has_context? " NO event context":"") );
99
100 if( inc.type() == incidentName() ) { // NextEventRange
101 std::string rangeID;
102 const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
103 if (fileInc != nullptr) {
104 rangeID = fileInc->fileName();
105 // Handle BeginInputFile
106 if (inc.type() == IncidentType::BeginInputFile) {
107 rangeID = "INFILE";
108 }
110 "Requested (through incident) Next Event Range filename extension: "
111 << rangeID);
112 }
113
114 if( rangeID == "dummy" ) {
115 if( not inConcurrentEventsMode() ) {
116 // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
117 // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
118 ATH_MSG_DEBUG("MetaData transition");
119 // immediate write and disconnect for ES, otherwise do it after Event write is done
120 bool disconnect { true };
121 std::lock_guard lockg( m_mutex );
122 if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
123 throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
124 }
125 }
126 // exit now, wait for the next (real) incident that will start the next range
127 return;
128 }
129 {
130 // start a new range
131 std::lock_guard lockg( m_mutex );
133 if( rangeID.empty() ) {
134 std::ostringstream n;
135 n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
136 rangeID = n.str();
137 ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
138 }
139 else if (rangeID == "INFILE") {
140 rangeID = std::to_string(m_fileSequenceNumber);
141 }
142 if( slot >= m_rangeIDinSlot.size() ) {
143 // MN - late resize, is there a better place for it?
144 m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
145 }
146 // from now on new events will use the new rangeID
147 m_currentRangeID = rangeID;
148 // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
149 if( not inConcurrentEventsMode() or has_context ) {
150 m_rangeIDinSlot[ slot ] = std::move(rangeID);
151 }
152 }
153 if( not inConcurrentEventsMode() and not fileInc ) {
154 // non-file incident case (filename=="") in regular SP LoopMgr
155 ATH_MSG_DEBUG("MetaData transition");
156 bool disconnect { false };
157 // MN: may not know the full filename yet, but that is only needed for disconnect==true
158 if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
159 throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
160 }
161 }
162 }
163 else if( inc.type() == IncidentType::BeginProcessing ) {
164 // new event start - assing current rangeId to its slot
165 std::lock_guard lockg( m_mutex );
166 ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << slot);
167 // If this service is enabled but not getting NextRange incidents, need to resize here
168 if( slot >= m_rangeIDinSlot.size() ) {
169 m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
170 }
172 }
173}
#define ATH_MSG_INFO(x)
std::string incidentName() const
The name of the incident that starts a new event sequence.
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1)
std::string m_lastIncident
Last incident type that was handled.

◆ incidentName()

std::string OutputStreamSequencerSvc::incidentName ( ) const
inline

The name of the incident that starts a new event sequence.

Definition at line 59 of file OutputStreamSequencerSvc.h.

59{ return m_incidentName.value(); }
StringProperty m_incidentName
SequenceIncidentName, incident name for triggering file sequencing.

◆ inConcurrentEventsMode()

bool OutputStreamSequencerSvc::inConcurrentEventsMode ( )
static

Are there concurrent events? (threads>1)

Definition at line 79 of file OutputStreamSequencerSvc.cxx.

79 {
80 return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
81}

◆ initialize()

StatusCode OutputStreamSequencerSvc::initialize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 35 of file OutputStreamSequencerSvc.cxx.

35 {
36 ATH_MSG_DEBUG("Initializing " << name());
37
38 // Set to be listener for end of event
39 ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
40 if (!incsvc.retrieve().isSuccess()) {
41 ATH_MSG_FATAL("Cannot get IncidentSvc.");
42 return(StatusCode::FAILURE);
43 }
44 if( !incidentName().empty() ) {
45 incsvc->addListener(this, incidentName(), 100);
46 incsvc->addListener(this, IncidentType::BeginProcessing, 100);
47 ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
48 ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
49 // Retrieve MetaDataSvc
50 if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
51 ATH_MSG_ERROR("Cannot get MetaDataSvc");
52 return StatusCode::FAILURE;
53 }
54 }
55
57 ATH_MSG_DEBUG("Concurrent events mode");
58 } else {
59 ATH_MSG_VERBOSE("Sequential events mode");
60 }
61
62 // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet
63 // m_rangeIDinSlot.resize( );
64 std::lock_guard lockg( m_mutex );
66
67 return(StatusCode::SUCCESS);
68}
#define ATH_MSG_FATAL(x)
#define ATH_MSG_VERBOSE(x)
static const Attributes_t empty

◆ inUse()

bool OutputStreamSequencerSvc::inUse ( ) const

Is the service in active use? (true after the first range incident is handled)

Definition at line 84 of file OutputStreamSequencerSvc.cxx.

84 {
85 std::lock_guard lockg( m_mutex );
86 return m_fileSequenceNumber >= 0;
87}

◆ lastIncident()

const std::string & OutputStreamSequencerSvc::lastIncident ( )
inline

Last incident type that was handled.

Definition at line 74 of file OutputStreamSequencerSvc.h.

74{ return m_lastIncident; }

◆ publishRangeReport()

void OutputStreamSequencerSvc::publishRangeReport ( const std::string & outputFile)

Definition at line 279 of file OutputStreamSequencerSvc.cxx.

280{
281 std::lock_guard lockg( m_mutex );
282 m_finishedRange = m_fnToRangeId.find(outputFile);
283}

◆ setRangeID()

std::string OutputStreamSequencerSvc::setRangeID ( const std::string & rangeID)

set the RangeID (possibly temporarily) so the right Range Filename may be generated

Definition at line 265 of file OutputStreamSequencerSvc.cxx.

266{
267 auto slot = Gaudi::Hive::currentContext().slot();
268 if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
269 std::lock_guard lockg( m_mutex );
270 if( slot >= m_rangeIDinSlot.size() ) {
271 throw std::runtime_error("OutputStreamSequencer::setRangeID(): slot out of range");
272 }
273 std::string oldrange = m_rangeIDinSlot[ slot ];
274 m_rangeIDinSlot[ slot ] = rangeID;
275 return oldrange;
276}

Member Data Documentation

◆ m_currentRangeID

std::string OutputStreamSequencerSvc::m_currentRangeID
private

Current EventRange ID constructed on the last NextRange incident.

Definition at line 83 of file OutputStreamSequencerSvc.h.

◆ m_fileSequenceNumber

int OutputStreamSequencerSvc::m_fileSequenceNumber {}
private

The event sequence number.

Definition at line 80 of file OutputStreamSequencerSvc.h.

80{};

◆ m_finishedRange

std::map<std::string,std::string>::iterator OutputStreamSequencerSvc::m_finishedRange {}
private

Definition at line 111 of file OutputStreamSequencerSvc.h.

111{};

◆ m_fnToRangeId

std::map<std::string,std::string> OutputStreamSequencerSvc::m_fnToRangeId
private

Definition at line 110 of file OutputStreamSequencerSvc.h.

◆ m_incidentName

StringProperty OutputStreamSequencerSvc::m_incidentName
private
Initial value:
{this, "SequenceIncidentName", "",
"Name of the incident that signals the next Event Range start" }

SequenceIncidentName, incident name for triggering file sequencing.

Definition at line 96 of file OutputStreamSequencerSvc.h.

96 {this, "SequenceIncidentName", "",
97 "Name of the incident that signals the next Event Range start" };

◆ m_lastFileName

std::string OutputStreamSequencerSvc::m_lastFileName
private

Recently constructed full file name (useful in single threaded processing)

Definition at line 86 of file OutputStreamSequencerSvc.h.

◆ m_lastIncident

std::string OutputStreamSequencerSvc::m_lastIncident
private

Last incident type that was handled.

Definition at line 89 of file OutputStreamSequencerSvc.h.

◆ m_metaDataSvc

ServiceHandle<MetaDataSvc> OutputStreamSequencerSvc::m_metaDataSvc
private

Definition at line 77 of file OutputStreamSequencerSvc.h.

◆ m_mutex

std::mutex OutputStreamSequencerSvc::m_mutex
mutableprivate

Definition at line 113 of file OutputStreamSequencerSvc.h.

◆ m_rangeIDinSlot

std::vector<std::string> OutputStreamSequencerSvc::m_rangeIDinSlot
private

EventRange ID for all slots.

Definition at line 92 of file OutputStreamSequencerSvc.h.

◆ m_replaceRangeMode

BooleanProperty OutputStreamSequencerSvc::m_replaceRangeMode
private
Initial value:
{
this, "ReplaceRangeMode", false,
"If True, everything between [ and ] in the output filename is treated "
"as a comma-separated list, and the range_idth element of the list is "
"selected"}

Flag to put in ReplaceRangeMode (i.e.

everything between [ and ] (inclusive) is replaced with the range_idth element of that list)

Definition at line 104 of file OutputStreamSequencerSvc.h.

104 {
105 this, "ReplaceRangeMode", false,
106 "If True, everything between [ and ] in the output filename is treated "
107 "as a comma-separated list, and the range_idth element of the list is "
108 "selected"};

◆ m_reportingOn

BooleanProperty OutputStreamSequencerSvc::m_reportingOn
private
Initial value:
{this, "ReportingOn", false,
"If True, keep info about Ranges for getRangeReport() calls"}

Flag to switch on storage of reporting info in fnToRangeId.

Definition at line 99 of file OutputStreamSequencerSvc.h.

99 {this, "ReportingOn", false,
100 "If True, keep info about Ranges for getRangeReport() calls"};

The documentation for this class was generated from the following files: