ATLAS Offline Software
Loading...
Searching...
No Matches
OutputStreamSequencerSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
9
11#include "MetaDataSvc.h"
12
13#include "GaudiKernel/IIncidentSvc.h"
14#include "GaudiKernel/FileIncident.h"
15#include "GaudiKernel/ConcurrencyFlags.h"
16
17#include <charconv>
18#include <format>
19#include <string_view>
20#include <sstream>
21
22
23//________________________________________________________________________________
24OutputStreamSequencerSvc::OutputStreamSequencerSvc(const std::string& name, ISvcLocator* pSvcLocator)
25 : base_class(name, pSvcLocator),
26 m_metaDataSvc("MetaDataSvc", name),
28{
29}
30
31//__________________________________________________________________________
34//__________________________________________________________________________
36 ATH_MSG_DEBUG("Initializing " << name());
37
38 // Set to be listener for end of event
39 ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
40 if (!incsvc.retrieve().isSuccess()) {
41 ATH_MSG_FATAL("Cannot get IncidentSvc.");
42 return(StatusCode::FAILURE);
43 }
44 if( !incidentName().empty() ) {
45 incsvc->addListener(this, incidentName(), 100);
46 incsvc->addListener(this, IncidentType::BeginProcessing, 100);
47 ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
48 ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
49 // Retrieve MetaDataSvc
50 if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
51 ATH_MSG_ERROR("Cannot get MetaDataSvc");
52 return StatusCode::FAILURE;
53 }
54 }
55
57 ATH_MSG_DEBUG("Concurrent events mode");
58 } else {
59 ATH_MSG_VERBOSE("Sequential events mode");
60 }
61
63
64 return(StatusCode::SUCCESS);
65}
66//__________________________________________________________________________
68 // Release MetaDataSvc
69 if (!m_metaDataSvc.release().isSuccess()) {
70 ATH_MSG_WARNING("Cannot release MetaDataSvc.");
71 }
72 return(StatusCode::SUCCESS);
73}
74
75//__________________________________________________________________________
77 return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
78}
79
80//__________________________________________________________________________
82 std::lock_guard lockg( m_mutex );
83 return m_fileSequenceNumber >= 0;
84}
85
86//__________________________________________________________________________
87void OutputStreamSequencerSvc::handle(const Incident& inc)
88{
89 const EventContext& ctx = inc.context();
90 m_lastIncident = inc.type();
91 ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for " << ctx);
92
93 if( inc.type() == incidentName() ) { // NextEventRange
94 std::string rangeID;
95 const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
96 if (fileInc != nullptr) {
97 rangeID = fileInc->fileName();
98 // Handle BeginInputFile
99 if (inc.type() == IncidentType::BeginInputFile) {
100 rangeID = "INFILE";
101 }
103 "Requested (through incident) Next Event Range filename extension: "
104 << rangeID);
105 }
106
107 if( rangeID == "dummy" ) {
108 if( not inConcurrentEventsMode() ) {
109 // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
110 // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
111 ATH_MSG_DEBUG("MetaData transition");
112 // immediate write and disconnect for ES, otherwise do it after Event write is done
113 bool disconnect { true };
114 std::lock_guard lockg( m_mutex );
115 if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
116 throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
117 }
118 }
119 // exit now, wait for the next (real) incident that will start the next range
120 return;
121 }
122 {
123 // start a new range
124 std::lock_guard lockg( m_mutex );
126 if( rangeID.empty() ) {
127 std::ostringstream n;
128 n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
129 rangeID = n.str();
130 ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
131 }
132 else if (rangeID == "INFILE") {
133 rangeID = std::to_string(m_fileSequenceNumber);
134 }
135 // from now on new events will use the new rangeID
136 m_currentRangeID = rangeID;
137 // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
138 if( not inConcurrentEventsMode() or ctx.valid() ) {
139 *m_rangeIDinSlot.get(ctx) = std::move(rangeID);
140 }
141 }
142 if( not inConcurrentEventsMode() and not fileInc ) {
143 // non-file incident case (filename=="") in regular SP LoopMgr
144 ATH_MSG_DEBUG("MetaData transition");
145 bool disconnect { false };
146 // MN: may not know the full filename yet, but that is only needed for disconnect==true
147 if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
148 throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
149 }
150 }
151 }
152 else if( inc.type() == IncidentType::BeginProcessing ) {
153 // new event start - assing current rangeId to its slot
154 std::lock_guard lockg( m_mutex );
155 ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << ctx.slot());
157 }
158}
159
160//__________________________________________________________________________
161std::string OutputStreamSequencerSvc::buildSequenceFileName(const std::string& orgFileName)
162{
163 if( !inUse() ) {
164 // Event sequences not in use, just return the original filename
165 return orgFileName;
166 }
167 std::string rangeID = currentRangeID();
168 std::lock_guard lockg( m_mutex );
169 if (!m_replaceRangeMode) {
170 // build the full output file name for this event range
171 std::string fileNameCore = orgFileName, fileNameExt;
172 std::size_t sepPos = orgFileName.find('[');
173 if (sepPos != std::string::npos) {
174 fileNameCore = orgFileName.substr(0, sepPos);
175 fileNameExt = orgFileName.substr(sepPos);
176 }
177 std::ostringstream n;
178 n << fileNameCore << "." << rangeID << fileNameExt;
179 m_lastFileName = n.str();
180 } else {
181 std::string_view origFileNameView = orgFileName;
182 std::size_t open = origFileNameView.find('[');
183 std::size_t close = origFileNameView.find(']');
184 // If we don't find a [ ] enclosed section, just append the rangeID to the
185 // end
186 if (open == std::string_view::npos || close == std::string_view::npos) {
187 m_lastFileName = std::format("{}.{}", origFileNameView, rangeID);
188 } else {
189 // build list of elems to substitute from
190 ATH_MSG_DEBUG("Building element list");
191 std::vector<std::string_view> elems{};
192 std::size_t pos = open + 1;
193 for (std::size_t comma = origFileNameView.find(',', pos);
194 comma < close;
195 comma = origFileNameView.find(',', pos)) {
196 std::string_view item = origFileNameView.substr(pos, comma - pos);
197 ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) comma = " << comma << ", item = " << item);
198 elems.push_back(item);
199 pos = comma + 1;
200 }
201 std::string_view last_item = origFileNameView.substr(pos, close - pos);
202 ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) close = " << close << ", item = " << last_item);
203 elems.push_back(last_item);
204 // substitute
205 std::size_t rangeIdx{};
206 auto rangeIdxParseRes = std::from_chars(
207 rangeID.data(), rangeID.data() + rangeID.size(), rangeIdx);
208 if (rangeIdxParseRes.ec != std::errc()) {
210 "Error parsing rangeID to integer. Replacing [] list with "
211 "rangeID.");
213 std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
214 origFileNameView.substr(close + 1));
215 } else if (rangeIdx >= elems.size()) {
217 "Number of elements in [] list <= rangeID. Replacing [] list with "
218 "rangeID.");
220 std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
221 origFileNameView.substr(close + 1));
222 } else {
223 m_lastFileName = std::format(
224 "{}{}{}", origFileNameView.substr(0, open), elems.at(rangeIdx),
225 origFileNameView.substr(close + 1));
226 ATH_MSG_DEBUG("Output file: " << m_lastFileName);
227 }
228 }
229 }
230
231 if( m_reportingOn.value() ) {
232 m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
233 }
234
235 return m_lastFileName;
236}
237
238
240{
241 if( !inUse() ) return "";
242 const EventContext& ctx = Gaudi::Hive::currentContext();
243 return *m_rangeIDinSlot.get(ctx);
244}
245
246
247std::string OutputStreamSequencerSvc::setRangeID(const std::string & rangeID)
248{
249 const EventContext& ctx = Gaudi::Hive::currentContext();
250 std::string* rangeid = m_rangeIDinSlot.get(ctx);
251 const std::string oldrange = *rangeid;
252 *rangeid = rangeID;
253 return oldrange;
254}
255
256
257void OutputStreamSequencerSvc::publishRangeReport(const std::string& outputFile)
258{
259 std::lock_guard lockg( m_mutex );
260 m_finishedRange = m_fnToRangeId.find(outputFile);
261}
262
264{
265 RangeReport_ptr report;
266 if( !m_reportingOn.value() ) {
267 ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
268 } else {
269 std::lock_guard lockg( m_mutex );
270 if(m_finishedRange!=m_fnToRangeId.end()) {
271 report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
274 }
275 }
276 return report;
277}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the MetaDataSvc class.
This file contains the class definition for the OutputStreamSequencerSvc class.
static const Attributes_t empty
void publishRangeReport(const std::string &outputFile)
SG::SlotSpecificObj< std::string, SG::InvalidSlot::Enabled > m_rangeIDinSlot
EventRange ID for all slots.
bool inUse() const
Is the service in active use? (true after the first range incident is handled).
OutputStreamSequencerSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
virtual void handle(const Incident &) override final
Incident service handle.
std::string currentRangeID() const
The current Event Range ID (only one range is returned).
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing).
std::string setRangeID(const std::string &rangeID)
set the RangeID (possibly temporarily) so the right Range Filename may be generated
ServiceHandle< MetaDataSvc > m_metaDataSvc
int m_fileSequenceNumber
The event sequence number.
BooleanProperty m_replaceRangeMode
Flag to put in ReplaceRangeMode (i.e.
virtual StatusCode finalize() override final
Required of all Gaudi services:
std::string incidentName() const
The name of the incident that starts a new event sequence.
std::string buildSequenceFileName(const std::string &)
Returns sequenced file name for output stream.
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
virtual ~OutputStreamSequencerSvc()
Destructor.
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1).
std::string m_lastIncident
Last incident type that was handled.
std::unique_ptr< RangeReport_t > RangeReport_ptr
std::map< std::string, std::string >::iterator m_finishedRange
virtual StatusCode initialize() override final
Required of all Gaudi services:
std::map< std::string, std::string > m_fnToRangeId