ATLAS Offline Software
OutputStreamSequencerSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
11 #include "MetaDataSvc.h"
12 
13 #include "GaudiKernel/IIncidentSvc.h"
14 #include "GaudiKernel/FileIncident.h"
15 #include "GaudiKernel/ConcurrencyFlags.h"
16 
17 #include <charconv>
18 #include <format>
19 #include <string_view>
20 #include <sstream>
21 
22 
23 //________________________________________________________________________________
24 OutputStreamSequencerSvc::OutputStreamSequencerSvc(const std::string& name, ISvcLocator* pSvcLocator)
25  : base_class(name, pSvcLocator),
26  m_metaDataSvc("MetaDataSvc", name),
27  m_fileSequenceNumber(-1)
28 {
29 }
30 
31 //__________________________________________________________________________
33 }
34 //__________________________________________________________________________
36  ATH_MSG_DEBUG("Initializing " << name());
37 
38  // Set to be listener for end of event
39  ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
40  if (!incsvc.retrieve().isSuccess()) {
41  ATH_MSG_FATAL("Cannot get IncidentSvc.");
42  return(StatusCode::FAILURE);
43  }
44  if( !incidentName().empty() ) {
45  incsvc->addListener(this, incidentName(), 100);
46  incsvc->addListener(this, IncidentType::BeginProcessing, 100);
47  ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
48  ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
49  // Retrieve MetaDataSvc
50  if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
51  ATH_MSG_ERROR("Cannot get MetaDataSvc");
52  return StatusCode::FAILURE;
53  }
54  }
55 
56  if( inConcurrentEventsMode() ) {
57  ATH_MSG_DEBUG("Concurrent events mode");
58  } else {
59  ATH_MSG_VERBOSE("Sequential events mode");
60  }
61 
62  // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet
63  // m_rangeIDinSlot.resize( );
64  std::lock_guard lockg( m_mutex );
66 
67  return(StatusCode::SUCCESS);
68 }
69 //__________________________________________________________________________
71  // Release MetaDataSvc
72  if (!m_metaDataSvc.release().isSuccess()) {
73  ATH_MSG_WARNING("Cannot release MetaDataSvc.");
74  }
75  return(StatusCode::SUCCESS);
76 }
77 
78 //__________________________________________________________________________
80  return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
81 }
82 
83 //__________________________________________________________________________
85  std::lock_guard lockg( m_mutex );
86  return m_fileSequenceNumber >= 0;
87 }
88 
89 //__________________________________________________________________________
90 void OutputStreamSequencerSvc::handle(const Incident& inc)
91 {
92  auto slot = Gaudi::Hive::currentContext().slot();
93  bool has_context = ( slot != EventContext::INVALID_CONTEXT_ID );
94  // in AthenaSP there is no context so go with the first slot
95  if( !has_context ) slot = 0;
96  m_lastIncident = inc.type();
97  ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for slot=" << slot
98  << (!has_context? " NO event context":"") );
99 
100  if( inc.type() == incidentName() ) { // NextEventRange
101  std::string rangeID;
102  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
103  if (fileInc != nullptr) {
104  rangeID = fileInc->fileName();
105  // Handle BeginInputFile
106  if (inc.type() == IncidentType::BeginInputFile) {
107  rangeID = "INFILE";
108  }
110  "Requested (through incident) Next Event Range filename extension: "
111  << rangeID);
112  }
113 
114  if( rangeID == "dummy" ) {
115  if( not inConcurrentEventsMode() ) {
116  // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
117  // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
118  ATH_MSG_DEBUG("MetaData transition");
119  // immediate write and disconnect for ES, otherwise do it after Event write is done
120  bool disconnect { true };
121  std::lock_guard lockg( m_mutex );
122  if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
123  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
124  }
125  }
126  // exit now, wait for the next (real) incident that will start the next range
127  return;
128  }
129  {
130  // start a new range
131  std::lock_guard lockg( m_mutex );
133  if( rangeID.empty() ) {
134  std::ostringstream n;
135  n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
136  rangeID = n.str();
137  ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
138  }
139  else if (rangeID == "INFILE") {
141  }
142  if( slot >= m_rangeIDinSlot.size() ) {
143  // MN - late resize, is there a better place for it?
144  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
145  }
146  // from now on new events will use the new rangeID
147  m_currentRangeID = rangeID;
148  // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
149  if( not inConcurrentEventsMode() or has_context ) {
150  m_rangeIDinSlot[ slot ] = std::move(rangeID);
151  }
152  }
153  if( not inConcurrentEventsMode() and not fileInc ) {
154  // non-file incident case (filename=="") in regular SP LoopMgr
155  ATH_MSG_DEBUG("MetaData transition");
156  bool disconnect { false };
157  // MN: may not know the full filename yet, but that is only needed for disconnect==true
158  if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
159  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
160  }
161  }
162  }
163  else if( inc.type() == IncidentType::BeginProcessing ) {
164  // new event start - assing current rangeId to its slot
165  std::lock_guard lockg( m_mutex );
166  ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << slot);
167  // If this service is enabled but not getting NextRange incidents, need to resize here
168  if( slot >= m_rangeIDinSlot.size() ) {
169  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
170  }
172  }
173 }
174 
175 //__________________________________________________________________________
176 std::string OutputStreamSequencerSvc::buildSequenceFileName(const std::string& orgFileName)
177 {
178  if( !inUse() ) {
179  // Event sequences not in use, just return the original filename
180  return orgFileName;
181  }
182  std::string rangeID = currentRangeID();
183  std::lock_guard lockg( m_mutex );
184  if (!m_replaceRangeMode) {
185  // build the full output file name for this event range
186  std::string fileNameCore = orgFileName, fileNameExt;
187  std::size_t sepPos = orgFileName.find('[');
188  if (sepPos != std::string::npos) {
189  fileNameCore = orgFileName.substr(0, sepPos);
190  fileNameExt = orgFileName.substr(sepPos);
191  }
192  std::ostringstream n;
193  n << fileNameCore << "." << rangeID << fileNameExt;
194  m_lastFileName = n.str();
195  } else {
196  std::string_view origFileNameView = orgFileName;
197  std::size_t open = origFileNameView.find('[');
198  std::size_t close = origFileNameView.find(']');
199  // If we don't find a [ ] enclosed section, just append the rangeID to the
200  // end
201  if (open == std::string_view::npos || close == std::string_view::npos) {
202  m_lastFileName = std::format("{}.{}", origFileNameView, rangeID);
203  } else {
204  // build list of elems to substitute from
205  ATH_MSG_DEBUG("Building element list");
206  std::vector<std::string_view> elems{};
207  std::size_t pos = open + 1;
208  for (std::size_t comma = origFileNameView.find(',', pos);
209  comma < close;
210  comma = origFileNameView.find(',', pos)) {
211  std::string_view item = origFileNameView.substr(pos, comma - pos);
212  ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) comma = " << comma << ", item = " << item);
213  elems.push_back(item);
214  pos = comma + 1;
215  }
216  std::string_view last_item = origFileNameView.substr(pos, close - pos);
217  ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) close = " << close << ", item = " << last_item);
218  elems.push_back(last_item);
219  // substitute
220  std::size_t rangeIdx{};
221  auto rangeIdxParseRes = std::from_chars(
222  rangeID.data(), rangeID.data() + rangeID.size(), rangeIdx);
223  if (rangeIdxParseRes.ec != std::errc()) {
225  "Error parsing rangeID to integer. Replacing [] list with "
226  "rangeID.");
228  std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
229  origFileNameView.substr(close + 1));
230  } else if (rangeIdx >= elems.size()) {
232  "Number of elements in [] list <= rangeID. Replacing [] list with "
233  "rangeID.");
235  std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
236  origFileNameView.substr(close + 1));
237  } else {
239  "{}{}{}", origFileNameView.substr(0, open), elems.at(rangeIdx),
240  origFileNameView.substr(close + 1));
241  ATH_MSG_DEBUG("Output file: " << m_lastFileName);
242  }
243  }
244  }
245 
246  if( m_reportingOn.value() ) {
247  m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
248  }
249 
250  return m_lastFileName;
251 }
252 
253 
255 {
256  if( !inUse() ) return "";
257  auto slot = Gaudi::Hive::currentContext().slot();
258  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
259  std::lock_guard lockg( m_mutex );
260  if( slot >= m_rangeIDinSlot.size() ) return "";
261  return m_rangeIDinSlot[ slot ];
262 }
263 
264 
265 std::string OutputStreamSequencerSvc::setRangeID(const std::string & rangeID)
266 {
267  auto slot = Gaudi::Hive::currentContext().slot();
268  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
269  std::lock_guard lockg( m_mutex );
270  if( slot >= m_rangeIDinSlot.size() ) {
271  throw std::runtime_error("OutputStreamSequencer::setRangeID(): slot out of range");
272  }
273  std::string oldrange = m_rangeIDinSlot[ slot ];
274  m_rangeIDinSlot[ slot ] = rangeID;
275  return oldrange;
276 }
277 
278 
280 {
281  std::lock_guard lockg( m_mutex );
283 }
284 
286 {
288  if( !m_reportingOn.value() ) {
289  ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
290  } else {
291  std::lock_guard lockg( m_mutex );
292  if(m_finishedRange!=m_fnToRangeId.end()) {
293  report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
296  }
297  }
298  return report;
299 }
OutputStreamSequencerSvc::incidentName
std::string incidentName() const
The name of the incident that starts a new event sequence.
Definition: OutputStreamSequencerSvc.h:59
OutputStreamSequencerSvc::inConcurrentEventsMode
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1)
Definition: OutputStreamSequencerSvc.cxx:79
OutputStreamSequencerSvc::m_lastFileName
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing)
Definition: OutputStreamSequencerSvc.h:86
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
OutputStreamSequencerSvc::m_fnToRangeId
std::map< std::string, std::string > m_fnToRangeId
Definition: OutputStreamSequencerSvc.h:110
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
OutputStreamSequencerSvc.h
This file contains the class definition for the OutputStreamSequencerSvc class.
OutputStreamSequencerSvc::OutputStreamSequencerSvc
OutputStreamSequencerSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: OutputStreamSequencerSvc.cxx:24
checkTP.report
report
Definition: checkTP.py:125
OutputStreamSequencerSvc::inUse
bool inUse() const
Is the service in active use? (true after the first range incident is handled)
Definition: OutputStreamSequencerSvc.cxx:84
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
OutputStreamSequencerSvc::m_mutex
std::mutex m_mutex
Definition: OutputStreamSequencerSvc.h:113
OutputStreamSequencerSvc::RangeReport_ptr
std::unique_ptr< RangeReport_t > RangeReport_ptr
Definition: OutputStreamSequencerSvc.h:35
compareGeometries.outputFile
string outputFile
Definition: compareGeometries.py:25
OutputStreamSequencerSvc::setRangeID
std::string setRangeID(const std::string &rangeID)
set the RangeID (possibly temporarily) so the right Range Filename may be generated
Definition: OutputStreamSequencerSvc.cxx:265
OutputStreamSequencerSvc::m_lastIncident
std::string m_lastIncident
Last incident type that was handled.
Definition: OutputStreamSequencerSvc.h:89
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
OutputStreamSequencerSvc::initialize
virtual StatusCode initialize() override final
Required of all Gaudi services:
Definition: OutputStreamSequencerSvc.cxx:35
beamspotman.n
n
Definition: beamspotman.py:729
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
OutputStreamSequencerSvc::m_reportingOn
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
Definition: OutputStreamSequencerSvc.h:99
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OutputStreamSequencerSvc::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Definition: OutputStreamSequencerSvc.h:77
OutputStreamSequencerSvc::m_rangeIDinSlot
std::vector< std::string > m_rangeIDinSlot
EventRange ID for all slots.
Definition: OutputStreamSequencerSvc.h:92
OutputStreamSequencerSvc::currentRangeID
std::string currentRangeID() const
The current Event Range ID (only one range is returned)
Definition: OutputStreamSequencerSvc.cxx:254
OutputStreamSequencerSvc::m_currentRangeID
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
Definition: OutputStreamSequencerSvc.h:83
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
ActsTrk::to_string
std::string to_string(const DetectorType &type)
Definition: GeometryDefs.h:34
item
Definition: ItemListSvc.h:43
OutputStreamSequencerSvc::m_replaceRangeMode
BooleanProperty m_replaceRangeMode
Flag to put in ReplaceRangeMode (i.e.
Definition: OutputStreamSequencerSvc.h:104
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:16
OutputStreamSequencerSvc::handle
virtual void handle(const Incident &) override final
Incident service handle.
Definition: OutputStreamSequencerSvc.cxx:90
OutputStreamSequencerSvc::getRangeReport
RangeReport_ptr getRangeReport()
Definition: OutputStreamSequencerSvc.cxx:285
OutputStreamSequencerSvc::m_fileSequenceNumber
int m_fileSequenceNumber
The event sequence number.
Definition: OutputStreamSequencerSvc.h:80
Trk::open
@ open
Definition: BinningType.h:40
MetaDataSvc.h
This file contains the class definition for the MetaDataSvc class.
OutputStreamSequencerSvc::m_finishedRange
std::map< std::string, std::string >::iterator m_finishedRange
Definition: OutputStreamSequencerSvc.h:111
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OutputStreamSequencerSvc::publishRangeReport
void publishRangeReport(const std::string &outputFile)
Definition: OutputStreamSequencerSvc.cxx:279
OutputStreamSequencerSvc::finalize
virtual StatusCode finalize() override final
Required of all Gaudi services:
Definition: OutputStreamSequencerSvc.cxx:70
OutputStreamSequencerSvc::~OutputStreamSequencerSvc
virtual ~OutputStreamSequencerSvc()
Destructor.
Definition: OutputStreamSequencerSvc.cxx:32
OutputStreamSequencerSvc::buildSequenceFileName
std::string buildSequenceFileName(const std::string &)
Returns sequenced file name for output stream.
Definition: OutputStreamSequencerSvc.cxx:176
ServiceHandle< IIncidentSvc >