ATLAS Offline Software
OutputStreamSequencerSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
11 #include "MetaDataSvc.h"
12 
13 #include "GaudiKernel/IIncidentSvc.h"
14 #include "GaudiKernel/FileIncident.h"
15 #include "GaudiKernel/ConcurrencyFlags.h"
16 
17 #include <sstream>
18 
19 //________________________________________________________________________________
20 OutputStreamSequencerSvc::OutputStreamSequencerSvc(const std::string& name, ISvcLocator* pSvcLocator)
21  : base_class(name, pSvcLocator),
22  m_metaDataSvc("MetaDataSvc", name),
23  m_fileSequenceNumber(-1)
24 {
25 }
26 
27 //__________________________________________________________________________
29 }
30 //__________________________________________________________________________
32  ATH_MSG_DEBUG("Initializing " << name());
33 
34  // Set to be listener for end of event
35  ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
36  if (!incsvc.retrieve().isSuccess()) {
37  ATH_MSG_FATAL("Cannot get IncidentSvc.");
38  return(StatusCode::FAILURE);
39  }
40  if( !incidentName().empty() ) {
41  incsvc->addListener(this, incidentName(), 100);
42  incsvc->addListener(this, IncidentType::BeginProcessing, 100);
43  ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
44  ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
45  // Retrieve MetaDataSvc
46  if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
47  ATH_MSG_ERROR("Cannot get MetaDataSvc");
48  return StatusCode::FAILURE;
49  }
50  }
51 
52  if( inConcurrentEventsMode() ) {
53  ATH_MSG_DEBUG("Concurrent events mode");
54  } else {
55  ATH_MSG_VERBOSE("Sequential events mode");
56  }
57 
58  // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet
59  // m_rangeIDinSlot.resize( );
61 
62  return(StatusCode::SUCCESS);
63 }
64 //__________________________________________________________________________
66  // Release MetaDataSvc
67  if (!m_metaDataSvc.release().isSuccess()) {
68  ATH_MSG_WARNING("Cannot release MetaDataSvc.");
69  }
70  return(StatusCode::SUCCESS);
71 }
72 
73 //__________________________________________________________________________
75  return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
76 }
77 
78 //__________________________________________________________________________
80  return m_fileSequenceNumber >= 0;
81 }
82 
83 //__________________________________________________________________________
84 void OutputStreamSequencerSvc::handle(const Incident& inc)
85 {
86  auto slot = Gaudi::Hive::currentContext().slot();
87  bool has_context = ( slot != EventContext::INVALID_CONTEXT_ID );
88  // in AthenaSP there is no context so go with the first slot
89  if( !has_context ) slot = 0;
90  m_lastIncident = inc.type();
91  ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for slot=" << slot
92  << (!has_context? " NO event context":"") );
93 
94  if( inc.type() == incidentName() ) { // NextEventRange
95  std::string rangeID;
96  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
97  if (fileInc != nullptr) {
98  rangeID = fileInc->fileName();
99  ATH_MSG_DEBUG("Requested (through incident) Next Event Range filename extension: " << rangeID);
100  }
101 
102  if( rangeID == "dummy" ) {
103  if( not inConcurrentEventsMode() ) {
104  // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
105  // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
106  ATH_MSG_DEBUG("MetaData transition");
107  // immediate write and disconnect for ES, otherwise do it after Event write is done
108  bool disconnect { true };
109  if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
110  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
111  }
112  }
113  // exit now, wait for the next (real) incident that will start the next range
114  return;
115  }
116  {
117  // start a new range
118  std::lock_guard lockg( m_mutex );
120  if( rangeID.empty() ) {
121  std::ostringstream n;
122  n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
123  rangeID = n.str();
124  ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
125  }
126  if( slot >= m_rangeIDinSlot.size() ) {
127  // MN - late resize, is there a better place for it?
128  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
129  }
130  // from now on new events will use the new rangeID
131  m_currentRangeID = rangeID;
132  // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
133  if( not inConcurrentEventsMode() or has_context ) {
134  m_rangeIDinSlot[ slot ] = rangeID;
135  }
136  }
137  if( not inConcurrentEventsMode() and not fileInc ) {
138  // non-file incident case (filename=="") in regular SP LoopMgr
139  ATH_MSG_DEBUG("MetaData transition");
140  bool disconnect { false };
141  // MN: may not know the full filename yet, but that is only needed for disconnect==true
142  if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
143  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
144  }
145  }
146  }
147  else if( inc.type() == IncidentType::BeginProcessing ) {
148  // new event start - assing current rangeId to its slot
149  ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << slot);
150  std::lock_guard lockg( m_mutex );
151  // If this service is enabled but not getting NextRange incidents, need to resize here
152  if( slot >= m_rangeIDinSlot.size() ) {
153  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
154  }
156  }
157 }
158 
159 //__________________________________________________________________________
160 std::string OutputStreamSequencerSvc::buildSequenceFileName(const std::string& orgFileName)
161 {
162  if( !inUse() ) {
163  // Event sequences not in use, just return the original filename
164  return orgFileName;
165  }
166  std::string rangeID = currentRangeID();
167  std::lock_guard lockg( m_mutex );
168  // build the full output file name for this event range
169  std::string fileNameCore = orgFileName, fileNameExt;
170  std::size_t sepPos = orgFileName.find('[');
171  if (sepPos != std::string::npos) {
172  fileNameCore = orgFileName.substr(0, sepPos);
173  fileNameExt = orgFileName.substr(sepPos);
174  }
175  std::ostringstream n;
176  n << fileNameCore << "." << rangeID << fileNameExt;
177  m_lastFileName = n.str();
178 
179  if( m_reportingOn.value() ) {
180  m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
181  }
182 
183  return m_lastFileName;
184 }
185 
186 
188 {
189  if( !inUse() ) return "";
190  auto slot = Gaudi::Hive::currentContext().slot();
191  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
192  std::lock_guard lockg( m_mutex );
193  if( slot >= m_rangeIDinSlot.size() ) return "";
194  return m_rangeIDinSlot[ slot ];
195 }
196 
197 
198 std::string OutputStreamSequencerSvc::setRangeID(const std::string & rangeID)
199 {
200  auto slot = Gaudi::Hive::currentContext().slot();
201  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
202  std::lock_guard lockg( m_mutex );
203  if( slot >= m_rangeIDinSlot.size() ) {
204  throw std::runtime_error("OutputStreamSequencer::setRangeID(): slot out of range");
205  }
206  std::string oldrange = m_rangeIDinSlot[ slot ];
207  m_rangeIDinSlot[ slot ] = rangeID;
208  return oldrange;
209 }
210 
211 
213 {
214  std::lock_guard lockg( m_mutex );
216 }
217 
219 {
221  if( !m_reportingOn.value() ) {
222  ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
223  } else {
224  std::lock_guard lockg( m_mutex );
225  if(m_finishedRange!=m_fnToRangeId.end()) {
226  report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
229  }
230  }
231  return report;
232 }
OutputStreamSequencerSvc::incidentName
std::string incidentName() const
The name of the incident that starts a new event sequence.
Definition: OutputStreamSequencerSvc.h:59
OutputStreamSequencerSvc::inConcurrentEventsMode
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1)
Definition: OutputStreamSequencerSvc.cxx:74
OutputStreamSequencerSvc::m_lastFileName
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing)
Definition: OutputStreamSequencerSvc.h:86
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
OutputStreamSequencerSvc::m_fnToRangeId
std::map< std::string, std::string > m_fnToRangeId
Definition: OutputStreamSequencerSvc.h:102
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
OutputStreamSequencerSvc.h
This file contains the class definition for the OutputStreamSequencerSvc class.
OutputStreamSequencerSvc::OutputStreamSequencerSvc
OutputStreamSequencerSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: OutputStreamSequencerSvc.cxx:20
checkTP.report
report
Definition: checkTP.py:127
OutputStreamSequencerSvc::inUse
bool inUse() const
Is the service in active use? (true after the first range incident is handled)
Definition: OutputStreamSequencerSvc.cxx:79
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
empty
bool empty(TH1 *h)
Definition: computils.cxx:295
OutputStreamSequencerSvc::m_mutex
std::mutex m_mutex
Definition: OutputStreamSequencerSvc.h:105
OutputStreamSequencerSvc::RangeReport_ptr
std::unique_ptr< RangeReport_t > RangeReport_ptr
Definition: OutputStreamSequencerSvc.h:35
compareGeometries.outputFile
string outputFile
Definition: compareGeometries.py:25
OutputStreamSequencerSvc::setRangeID
std::string setRangeID(const std::string &rangeID)
set the RangeID (possibly temporarily) so the right Range Filename may be generated
Definition: OutputStreamSequencerSvc.cxx:198
OutputStreamSequencerSvc::m_lastIncident
std::string m_lastIncident
Last incident type that was handled.
Definition: OutputStreamSequencerSvc.h:89
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
OutputStreamSequencerSvc::initialize
virtual StatusCode initialize() override final
Required of all Gaudi services:
Definition: OutputStreamSequencerSvc.cxx:31
beamspotman.n
n
Definition: beamspotman.py:731
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
OutputStreamSequencerSvc::m_reportingOn
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
Definition: OutputStreamSequencerSvc.h:99
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OutputStreamSequencerSvc::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Definition: OutputStreamSequencerSvc.h:77
OutputStreamSequencerSvc::m_rangeIDinSlot
std::vector< std::string > m_rangeIDinSlot
EventRange ID for all slots.
Definition: OutputStreamSequencerSvc.h:92
OutputStreamSequencerSvc::currentRangeID
std::string currentRangeID() const
The current Event Range ID (only one range is returned)
Definition: OutputStreamSequencerSvc.cxx:187
OutputStreamSequencerSvc::m_currentRangeID
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
Definition: OutputStreamSequencerSvc.h:83
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
OutputStreamSequencerSvc::handle
virtual void handle(const Incident &) override final
Incident service handle.
Definition: OutputStreamSequencerSvc.cxx:84
OutputStreamSequencerSvc::getRangeReport
RangeReport_ptr getRangeReport()
Definition: OutputStreamSequencerSvc.cxx:218
OutputStreamSequencerSvc::m_fileSequenceNumber
int m_fileSequenceNumber
The event sequence number.
Definition: OutputStreamSequencerSvc.h:80
MetaDataSvc.h
This file contains the class definition for the MetaDataSvc class.
OutputStreamSequencerSvc::m_finishedRange
std::map< std::string, std::string >::iterator m_finishedRange
Definition: OutputStreamSequencerSvc.h:103
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OutputStreamSequencerSvc::publishRangeReport
void publishRangeReport(const std::string &outputFile)
Definition: OutputStreamSequencerSvc.cxx:212
OutputStreamSequencerSvc::finalize
virtual StatusCode finalize() override final
Required of all Gaudi services:
Definition: OutputStreamSequencerSvc.cxx:65
OutputStreamSequencerSvc::~OutputStreamSequencerSvc
virtual ~OutputStreamSequencerSvc()
Destructor.
Definition: OutputStreamSequencerSvc.cxx:28
OutputStreamSequencerSvc::buildSequenceFileName
std::string buildSequenceFileName(const std::string &)
Returns sequenced file name for output stream.
Definition: OutputStreamSequencerSvc.cxx:160
ServiceHandle< IIncidentSvc >