ATLAS Offline Software
Loading...
Searching...
No Matches
OutputStreamSequencerSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11#include "MetaDataSvc.h"
12
13#include "GaudiKernel/IIncidentSvc.h"
14#include "GaudiKernel/FileIncident.h"
15#include "GaudiKernel/ConcurrencyFlags.h"
16
17#include <charconv>
18#include <format>
19#include <string_view>
20#include <sstream>
21
22
23//________________________________________________________________________________
24OutputStreamSequencerSvc::OutputStreamSequencerSvc(const std::string& name, ISvcLocator* pSvcLocator)
25 : base_class(name, pSvcLocator),
26 m_metaDataSvc("MetaDataSvc", name),
28{
29}
30
31//__________________________________________________________________________
34//__________________________________________________________________________
36 ATH_MSG_DEBUG("Initializing " << name());
37
38 // Set to be listener for end of event
39 ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
40 if (!incsvc.retrieve().isSuccess()) {
41 ATH_MSG_FATAL("Cannot get IncidentSvc.");
42 return(StatusCode::FAILURE);
43 }
44 if( !incidentName().empty() ) {
45 incsvc->addListener(this, incidentName(), 100);
46 incsvc->addListener(this, IncidentType::BeginProcessing, 100);
47 ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
48 ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
49 // Retrieve MetaDataSvc
50 if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
51 ATH_MSG_ERROR("Cannot get MetaDataSvc");
52 return StatusCode::FAILURE;
53 }
54 }
55
57 ATH_MSG_DEBUG("Concurrent events mode");
58 } else {
59 ATH_MSG_VERBOSE("Sequential events mode");
60 }
61
62 // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet
63 // m_rangeIDinSlot.resize( );
64 std::lock_guard lockg( m_mutex );
66
67 return(StatusCode::SUCCESS);
68}
69//__________________________________________________________________________
71 // Release MetaDataSvc
72 if (!m_metaDataSvc.release().isSuccess()) {
73 ATH_MSG_WARNING("Cannot release MetaDataSvc.");
74 }
75 return(StatusCode::SUCCESS);
76}
77
78//__________________________________________________________________________
80 return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
81}
82
83//__________________________________________________________________________
85 std::lock_guard lockg( m_mutex );
86 return m_fileSequenceNumber >= 0;
87}
88
89//__________________________________________________________________________
90void OutputStreamSequencerSvc::handle(const Incident& inc)
91{
92 auto slot = Gaudi::Hive::currentContext().slot();
93 bool has_context = ( slot != EventContext::INVALID_CONTEXT_ID );
94 // in AthenaSP there is no context so go with the first slot
95 if( !has_context ) slot = 0;
96 m_lastIncident = inc.type();
97 ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for slot=" << slot
98 << (!has_context? " NO event context":"") );
99
100 if( inc.type() == incidentName() ) { // NextEventRange
101 std::string rangeID;
102 const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
103 if (fileInc != nullptr) {
104 rangeID = fileInc->fileName();
105 // Handle BeginInputFile
106 if (inc.type() == IncidentType::BeginInputFile) {
107 rangeID = "INFILE";
108 }
110 "Requested (through incident) Next Event Range filename extension: "
111 << rangeID);
112 }
113
114 if( rangeID == "dummy" ) {
115 if( not inConcurrentEventsMode() ) {
116 // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
117 // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
118 ATH_MSG_DEBUG("MetaData transition");
119 // immediate write and disconnect for ES, otherwise do it after Event write is done
120 bool disconnect { true };
121 std::lock_guard lockg( m_mutex );
122 if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
123 throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
124 }
125 }
126 // exit now, wait for the next (real) incident that will start the next range
127 return;
128 }
129 {
130 // start a new range
131 std::lock_guard lockg( m_mutex );
133 if( rangeID.empty() ) {
134 std::ostringstream n;
135 n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
136 rangeID = n.str();
137 ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
138 }
139 else if (rangeID == "INFILE") {
140 rangeID = std::to_string(m_fileSequenceNumber);
141 }
142 if( slot >= m_rangeIDinSlot.size() ) {
143 // MN - late resize, is there a better place for it?
144 m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
145 }
146 // from now on new events will use the new rangeID
147 m_currentRangeID = rangeID;
148 // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
149 if( not inConcurrentEventsMode() or has_context ) {
150 m_rangeIDinSlot[ slot ] = std::move(rangeID);
151 }
152 }
153 if( not inConcurrentEventsMode() and not fileInc ) {
154 // non-file incident case (filename=="") in regular SP LoopMgr
155 ATH_MSG_DEBUG("MetaData transition");
156 bool disconnect { false };
157 // MN: may not know the full filename yet, but that is only needed for disconnect==true
158 if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
159 throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
160 }
161 }
162 }
163 else if( inc.type() == IncidentType::BeginProcessing ) {
164 // new event start - assing current rangeId to its slot
165 std::lock_guard lockg( m_mutex );
166 ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << slot);
167 // If this service is enabled but not getting NextRange incidents, need to resize here
168 if( slot >= m_rangeIDinSlot.size() ) {
169 m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
170 }
172 }
173}
174
175//__________________________________________________________________________
176std::string OutputStreamSequencerSvc::buildSequenceFileName(const std::string& orgFileName)
177{
178 if( !inUse() ) {
179 // Event sequences not in use, just return the original filename
180 return orgFileName;
181 }
182 std::string rangeID = currentRangeID();
183 std::lock_guard lockg( m_mutex );
184 if (!m_replaceRangeMode) {
185 // build the full output file name for this event range
186 std::string fileNameCore = orgFileName, fileNameExt;
187 std::size_t sepPos = orgFileName.find('[');
188 if (sepPos != std::string::npos) {
189 fileNameCore = orgFileName.substr(0, sepPos);
190 fileNameExt = orgFileName.substr(sepPos);
191 }
192 std::ostringstream n;
193 n << fileNameCore << "." << rangeID << fileNameExt;
194 m_lastFileName = n.str();
195 } else {
196 std::string_view origFileNameView = orgFileName;
197 std::size_t open = origFileNameView.find('[');
198 std::size_t close = origFileNameView.find(']');
199 // If we don't find a [ ] enclosed section, just append the rangeID to the
200 // end
201 if (open == std::string_view::npos || close == std::string_view::npos) {
202 m_lastFileName = std::format("{}.{}", origFileNameView, rangeID);
203 } else {
204 // build list of elems to substitute from
205 ATH_MSG_DEBUG("Building element list");
206 std::vector<std::string_view> elems{};
207 std::size_t pos = open + 1;
208 for (std::size_t comma = origFileNameView.find(',', pos);
209 comma < close;
210 comma = origFileNameView.find(',', pos)) {
211 std::string_view item = origFileNameView.substr(pos, comma - pos);
212 ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) comma = " << comma << ", item = " << item);
213 elems.push_back(item);
214 pos = comma + 1;
215 }
216 std::string_view last_item = origFileNameView.substr(pos, close - pos);
217 ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) close = " << close << ", item = " << last_item);
218 elems.push_back(last_item);
219 // substitute
220 std::size_t rangeIdx{};
221 auto rangeIdxParseRes = std::from_chars(
222 rangeID.data(), rangeID.data() + rangeID.size(), rangeIdx);
223 if (rangeIdxParseRes.ec != std::errc()) {
225 "Error parsing rangeID to integer. Replacing [] list with "
226 "rangeID.");
228 std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
229 origFileNameView.substr(close + 1));
230 } else if (rangeIdx >= elems.size()) {
232 "Number of elements in [] list <= rangeID. Replacing [] list with "
233 "rangeID.");
235 std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
236 origFileNameView.substr(close + 1));
237 } else {
238 m_lastFileName = std::format(
239 "{}{}{}", origFileNameView.substr(0, open), elems.at(rangeIdx),
240 origFileNameView.substr(close + 1));
241 ATH_MSG_DEBUG("Output file: " << m_lastFileName);
242 }
243 }
244 }
245
246 if( m_reportingOn.value() ) {
247 m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
248 }
249
250 return m_lastFileName;
251}
252
253
255{
256 if( !inUse() ) return "";
257 auto slot = Gaudi::Hive::currentContext().slot();
258 if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
259 std::lock_guard lockg( m_mutex );
260 if( slot >= m_rangeIDinSlot.size() ) return "";
261 return m_rangeIDinSlot[ slot ];
262}
263
264
265std::string OutputStreamSequencerSvc::setRangeID(const std::string & rangeID)
266{
267 auto slot = Gaudi::Hive::currentContext().slot();
268 if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
269 std::lock_guard lockg( m_mutex );
270 if( slot >= m_rangeIDinSlot.size() ) {
271 throw std::runtime_error("OutputStreamSequencer::setRangeID(): slot out of range");
272 }
273 std::string oldrange = m_rangeIDinSlot[ slot ];
274 m_rangeIDinSlot[ slot ] = rangeID;
275 return oldrange;
276}
277
278
279void OutputStreamSequencerSvc::publishRangeReport(const std::string& outputFile)
280{
281 std::lock_guard lockg( m_mutex );
282 m_finishedRange = m_fnToRangeId.find(outputFile);
283}
284
286{
287 RangeReport_ptr report;
288 if( !m_reportingOn.value() ) {
289 ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
290 } else {
291 std::lock_guard lockg( m_mutex );
292 if(m_finishedRange!=m_fnToRangeId.end()) {
293 report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
296 }
297 }
298 return report;
299}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the MetaDataSvc class.
This file contains the class definition for the OutputStreamSequencerSvc class.
static const Attributes_t empty
void publishRangeReport(const std::string &outputFile)
bool inUse() const
Is the service in active use? (true after the first range incident is handled)
OutputStreamSequencerSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual void handle(const Incident &) override final
Incident service handle.
std::string currentRangeID() const
The current Event Range ID (only one range is returned)
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing)
std::string setRangeID(const std::string &rangeID)
set the RangeID (possibly temporarily) so the right Range Filename may be generated
ServiceHandle< MetaDataSvc > m_metaDataSvc
int m_fileSequenceNumber
The event sequence number.
BooleanProperty m_replaceRangeMode
Flag to put in ReplaceRangeMode (i.e.
virtual StatusCode finalize() override final
Required of all Gaudi services:
std::string incidentName() const
The name of the incident that starts a new event sequence.
std::string buildSequenceFileName(const std::string &)
Returns sequenced file name for output stream.
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
virtual ~OutputStreamSequencerSvc()
Destructor.
std::vector< std::string > m_rangeIDinSlot
EventRange ID for all slots.
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1)
std::string m_lastIncident
Last incident type that was handled.
std::unique_ptr< RangeReport_t > RangeReport_ptr
std::map< std::string, std::string >::iterator m_finishedRange
virtual StatusCode initialize() override final
Required of all Gaudi services:
std::map< std::string, std::string > m_fnToRangeId