ATLAS Offline Software
OutputStreamSequencerSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
3 */
4 
11 #include "MetaDataSvc.h"
12 
13 #include "GaudiKernel/IIncidentSvc.h"
14 #include "GaudiKernel/FileIncident.h"
15 #include "GaudiKernel/ConcurrencyFlags.h"
16 
17 #include <sstream>
18 
19 //________________________________________________________________________________
20 OutputStreamSequencerSvc::OutputStreamSequencerSvc(const std::string& name, ISvcLocator* pSvcLocator) : ::AthService(name, pSvcLocator),
21  m_metaDataSvc("MetaDataSvc", name),
22  m_fileSequenceNumber(-1)
23 {
24 }
25 
26 //__________________________________________________________________________
28 }
29 //__________________________________________________________________________
31  ATH_MSG_DEBUG("Initializing " << name());
32 
33  // Set to be listener for end of event
34  ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
35  if (!incsvc.retrieve().isSuccess()) {
36  ATH_MSG_FATAL("Cannot get IncidentSvc.");
37  return(StatusCode::FAILURE);
38  }
39  if( !incidentName().empty() ) {
40  incsvc->addListener(this, incidentName(), 100);
41  incsvc->addListener(this, IncidentType::BeginProcessing, 100);
42  ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
43  ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
44  // Retrieve MetaDataSvc
45  if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
46  ATH_MSG_ERROR("Cannot get MetaDataSvc");
47  return StatusCode::FAILURE;
48  }
49  }
50 
51  if( inConcurrentEventsMode() ) {
52  ATH_MSG_DEBUG("Concurrent events mode");
53  } else {
54  ATH_MSG_VERBOSE("Sequential events mode");
55  }
56 
57  // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet
58  // m_rangeIDinSlot.resize( );
60 
61  return(StatusCode::SUCCESS);
62 }
63 //__________________________________________________________________________
65  // Release MetaDataSvc
66  if (!m_metaDataSvc.release().isSuccess()) {
67  ATH_MSG_WARNING("Cannot release MetaDataSvc.");
68  }
69  return(StatusCode::SUCCESS);
70 }
71 //_______________________________________________________________________
72 StatusCode OutputStreamSequencerSvc::queryInterface(const InterfaceID& riid, void** ppvInterface) {
73  if (riid == this->interfaceID()) {
74  *ppvInterface = this;
75  } else {
76  // Interface is not directly available: try out a base class
77  return(::AthService::queryInterface(riid, ppvInterface));
78  }
79  addRef();
80  return(StatusCode::SUCCESS);
81 }
82 
83 //__________________________________________________________________________
85  return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
86 }
87 
88 //__________________________________________________________________________
90  return m_fileSequenceNumber >= 0;
91 }
92 
93 //__________________________________________________________________________
94 void OutputStreamSequencerSvc::handle(const Incident& inc)
95 {
96  auto slot = Gaudi::Hive::currentContext().slot();
97  bool has_context = ( slot != EventContext::INVALID_CONTEXT_ID );
98  // in AthenaSP there is no context so go with the first slot
99  if( !has_context ) slot = 0;
100  m_lastIncident = inc.type();
101  ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for slot=" << slot
102  << (!has_context? " NO event context":"") );
103 
104  if( inc.type() == incidentName() ) { // NextEventRange
105  std::string rangeID;
106  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
107  if (fileInc != nullptr) {
108  rangeID = fileInc->fileName();
109  ATH_MSG_DEBUG("Requested (through incident) Next Event Range filename extension: " << rangeID);
110  }
111 
112  if( rangeID == "dummy" ) {
113  if( not inConcurrentEventsMode() ) {
114  // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
115  // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
116  ATH_MSG_DEBUG("MetaData transition");
117  // immediate write and disconnect for ES, otherwise do it after Event write is done
118  bool disconnect { true };
119  if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
120  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
121  }
122  }
123  // exit now, wait for the next (real) incident that will start the next range
124  return;
125  }
126  {
127  // start a new range
128  std::lock_guard lockg( m_mutex );
130  if( rangeID.empty() ) {
131  std::ostringstream n;
132  n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
133  rangeID = n.str();
134  ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
135  }
136  if( slot >= m_rangeIDinSlot.size() ) {
137  // MN - late resize, is there a better place for it?
138  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
139  }
140  // from now on new events will use the new rangeID
141  m_currentRangeID = rangeID;
142  // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
143  if( not inConcurrentEventsMode() or has_context ) {
144  m_rangeIDinSlot[ slot ] = rangeID;
145  }
146  }
147  if( not inConcurrentEventsMode() and not fileInc ) {
148  // non-file incident case (filename=="") in regular SP LoopMgr
149  ATH_MSG_DEBUG("MetaData transition");
150  bool disconnect { false };
151  // MN: may not know the full filename yet, but that is only needed for disconnect==true
152  if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
153  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
154  }
155  }
156  }
157  else if( inc.type() == IncidentType::BeginProcessing ) {
158  // new event start - assing current rangeId to its slot
159  ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << slot);
160  std::lock_guard lockg( m_mutex );
161  // If this service is enabled but not getting NextRange incidents, need to resize here
162  if( slot >= m_rangeIDinSlot.size() ) {
163  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
164  }
166  }
167 }
168 
169 //__________________________________________________________________________
170 std::string OutputStreamSequencerSvc::buildSequenceFileName(const std::string& orgFileName)
171 {
172  if( !inUse() ) {
173  // Event sequences not in use, just return the original filename
174  return orgFileName;
175  }
176  std::string rangeID = currentRangeID();
177  std::lock_guard lockg( m_mutex );
178  // build the full output file name for this event range
179  std::string fileNameCore = orgFileName, fileNameExt;
180  std::size_t sepPos = orgFileName.find('[');
181  if (sepPos != std::string::npos) {
182  fileNameCore = orgFileName.substr(0, sepPos);
183  fileNameExt = orgFileName.substr(sepPos);
184  }
185  std::ostringstream n;
186  n << fileNameCore << "." << rangeID << fileNameExt;
187  m_lastFileName = n.str();
188 
189  if( m_reportingOn.value() ) {
190  m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
191  }
192 
193  return m_lastFileName;
194 }
195 
196 
198 {
199  if( !inUse() ) return "";
200  auto slot = Gaudi::Hive::currentContext().slot();
201  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
202  std::lock_guard lockg( m_mutex );
203  if( slot >= m_rangeIDinSlot.size() ) return "";
204  return m_rangeIDinSlot[ slot ];
205 }
206 
207 
208 std::string OutputStreamSequencerSvc::setRangeID(const std::string & rangeID)
209 {
210  auto slot = Gaudi::Hive::currentContext().slot();
211  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
212  std::lock_guard lockg( m_mutex );
213  if( slot >= m_rangeIDinSlot.size() ) {
214  throw std::runtime_error("OutputStreamSequencer::setRangeID(): slot out of range");
215  }
216  std::string oldrange = m_rangeIDinSlot[ slot ];
217  m_rangeIDinSlot[ slot ] = rangeID;
218  return oldrange;
219 }
220 
221 
223 {
224  std::lock_guard lockg( m_mutex );
226 }
227 
229 {
231  if( !m_reportingOn.value() ) {
232  ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
233  } else {
234  std::lock_guard lockg( m_mutex );
235  if(m_finishedRange!=m_fnToRangeId.end()) {
236  report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
239  }
240  }
241  return report;
242 }
OutputStreamSequencerSvc::incidentName
std::string incidentName() const
The name of the incident that starts a new event sequence.
Definition: OutputStreamSequencerSvc.h:65
OutputStreamSequencerSvc::inConcurrentEventsMode
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1)
Definition: OutputStreamSequencerSvc.cxx:84
OutputStreamSequencerSvc::m_lastFileName
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing)
Definition: OutputStreamSequencerSvc.h:92
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
max
#define max(a, b)
Definition: cfImp.cxx:41
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
OutputStreamSequencerSvc::m_fnToRangeId
std::map< std::string, std::string > m_fnToRangeId
Definition: OutputStreamSequencerSvc.h:108
OutputStreamSequencerSvc.h
This file contains the class definition for the OutputStreamSequencerSvc class.
OutputStreamSequencerSvc::OutputStreamSequencerSvc
OutputStreamSequencerSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: OutputStreamSequencerSvc.cxx:20
checkTP.report
report
Definition: checkTP.py:127
OutputStreamSequencerSvc::inUse
bool inUse() const
Is the service in active use? (true after the first range incident is handled)
Definition: OutputStreamSequencerSvc.cxx:89
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
empty
bool empty(TH1 *h)
Definition: computils.cxx:294
OutputStreamSequencerSvc::m_mutex
std::mutex m_mutex
Definition: OutputStreamSequencerSvc.h:111
OutputStreamSequencerSvc::RangeReport_ptr
std::unique_ptr< RangeReport_t > RangeReport_ptr
Definition: OutputStreamSequencerSvc.h:37
compareGeometries.outputFile
string outputFile
Definition: compareGeometries.py:25
OutputStreamSequencerSvc::setRangeID
std::string setRangeID(const std::string &rangeID)
set the RangeID (possibly temporarily) so the right Range Filename may be generated
Definition: OutputStreamSequencerSvc.cxx:208
OutputStreamSequencerSvc::m_lastIncident
std::string m_lastIncident
Last incident type that was handled.
Definition: OutputStreamSequencerSvc.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
OutputStreamSequencerSvc::initialize
virtual StatusCode initialize() override final
Required of all Gaudi services:
Definition: OutputStreamSequencerSvc.cxx:30
beamspotman.n
n
Definition: beamspotman.py:731
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
OutputStreamSequencerSvc::m_reportingOn
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
Definition: OutputStreamSequencerSvc.h:105
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
AthService
Definition: AthService.h:32
OutputStreamSequencerSvc::queryInterface
virtual StatusCode queryInterface(const InterfaceID &riid, void **ppvInterface) override final
Required of all Gaudi services: see Gaudi documentation for details.
Definition: OutputStreamSequencerSvc.cxx:72
OutputStreamSequencerSvc::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Definition: OutputStreamSequencerSvc.h:83
OutputStreamSequencerSvc::m_rangeIDinSlot
std::vector< std::string > m_rangeIDinSlot
EventRange ID for all slots.
Definition: OutputStreamSequencerSvc.h:98
OutputStreamSequencerSvc::currentRangeID
std::string currentRangeID() const
The current Event Range ID (only one range is returned)
Definition: OutputStreamSequencerSvc.cxx:197
OutputStreamSequencerSvc::m_currentRangeID
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
Definition: OutputStreamSequencerSvc.h:89
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
OutputStreamSequencerSvc::handle
virtual void handle(const Incident &) override final
Incident service handle.
Definition: OutputStreamSequencerSvc.cxx:94
OutputStreamSequencerSvc::getRangeReport
RangeReport_ptr getRangeReport()
Definition: OutputStreamSequencerSvc.cxx:228
OutputStreamSequencerSvc::m_fileSequenceNumber
int m_fileSequenceNumber
The event sequence number.
Definition: OutputStreamSequencerSvc.h:86
MetaDataSvc.h
This file contains the class definition for the MetaDataSvc class.
OutputStreamSequencerSvc::m_finishedRange
std::map< std::string, std::string >::iterator m_finishedRange
Definition: OutputStreamSequencerSvc.h:109
OutputStreamSequencerSvc::interfaceID
static const InterfaceID & interfaceID()
Retrieve interface ID.
Definition: OutputStreamSequencerSvc.h:46
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OutputStreamSequencerSvc::publishRangeReport
void publishRangeReport(const std::string &outputFile)
Definition: OutputStreamSequencerSvc.cxx:222
OutputStreamSequencerSvc::finalize
virtual StatusCode finalize() override final
Required of all Gaudi services:
Definition: OutputStreamSequencerSvc.cxx:64
OutputStreamSequencerSvc::~OutputStreamSequencerSvc
virtual ~OutputStreamSequencerSvc()
Destructor.
Definition: OutputStreamSequencerSvc.cxx:27
OutputStreamSequencerSvc::buildSequenceFileName
std::string buildSequenceFileName(const std::string &)
Returns sequenced file name for output stream.
Definition: OutputStreamSequencerSvc.cxx:170
ServiceHandle< IIncidentSvc >