ATLAS Offline Software
Public Types | Public Member Functions | Static Public Member Functions | Private Attributes | List of all members
OutputStreamSequencerSvc Class Reference

This class provides configuration properties to enable OutputStream file sequences. More...

#include <OutputStreamSequencerSvc.h>

Inheritance diagram for OutputStreamSequencerSvc:
Collaboration diagram for OutputStreamSequencerSvc:

Public Types

typedef std::pair< std::string, std::string > RangeReport_t
 
typedef std::unique_ptr< RangeReport_tRangeReport_ptr
 

Public Member Functions

 OutputStreamSequencerSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor. More...
 
virtual ~OutputStreamSequencerSvc ()
 Destructor. More...
 
virtual StatusCode initialize () override final
 Required of all Gaudi services: More...
 
virtual StatusCode finalize () override final
 Required of all Gaudi services: More...
 
virtual void handle (const Incident &) override final
 Incident service handle. More...
 
std::string buildSequenceFileName (const std::string &)
 Returns sequenced file name for output stream. More...
 
void publishRangeReport (const std::string &outputFile)
 
RangeReport_ptr getRangeReport ()
 
std::string incidentName () const
 The name of the incident that starts a new event sequence. More...
 
std::string currentRangeID () const
 The current Event Range ID (only one range is returned) More...
 
std::string setRangeID (const std::string &rangeID)
 set the RangeID (possibly temporarily) so the right Range Filename may be generated More...
 
bool inUse () const
 Is the service in active use? (true after the first range incident is handled) More...
 
const std::string & lastIncident ()
 Last incident type that was handled. More...
 

Static Public Member Functions

static bool inConcurrentEventsMode ()
 Are there concurrent events? (threads>1) More...
 

Private Attributes

ServiceHandle< MetaDataSvcm_metaDataSvc
 
int m_fileSequenceNumber {}
 The event sequence number. More...
 
std::string m_currentRangeID
 Current EventRange ID constructed on the last NextRange incident. More...
 
std::string m_lastFileName
 Recently constructed full file name (useful in single threaded processing) More...
 
std::string m_lastIncident
 Last incident type that was handled. More...
 
std::vector< std::string > m_rangeIDinSlot
 EventRange ID for all slots. More...
 
StringProperty m_incidentName
 SequenceIncidentName, incident name for triggering file sequencing. More...
 
BooleanProperty m_reportingOn
 Flag to switch on storage of reporting info in fnToRangeId. More...
 
BooleanProperty m_replaceRangeMode
 Flag to put in ReplaceRangeMode (i.e. More...
 
std::map< std::string, std::string > m_fnToRangeId
 
std::map< std::string, std::string >::iterator m_finishedRange {}
 
std::mutex m_mutex
 

Detailed Description

This class provides configuration properties to enable OutputStream file sequences.

Definition at line 29 of file OutputStreamSequencerSvc.h.

Member Typedef Documentation

◆ RangeReport_ptr

Definition at line 35 of file OutputStreamSequencerSvc.h.

◆ RangeReport_t

typedef std::pair<std::string,std::string> OutputStreamSequencerSvc::RangeReport_t

Definition at line 34 of file OutputStreamSequencerSvc.h.

Constructor & Destructor Documentation

◆ OutputStreamSequencerSvc()

OutputStreamSequencerSvc::OutputStreamSequencerSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Standard Service Constructor.

Definition at line 24 of file OutputStreamSequencerSvc.cxx.

25  : base_class(name, pSvcLocator),
26  m_metaDataSvc("MetaDataSvc", name),
28 {
29 }

◆ ~OutputStreamSequencerSvc()

OutputStreamSequencerSvc::~OutputStreamSequencerSvc ( )
virtual

Destructor.

Definition at line 32 of file OutputStreamSequencerSvc.cxx.

32  {
33 }

Member Function Documentation

◆ buildSequenceFileName()

std::string OutputStreamSequencerSvc::buildSequenceFileName ( const std::string &  orgFileName)

Returns sequenced file name for output stream.

Definition at line 176 of file OutputStreamSequencerSvc.cxx.

177 {
178  if( !inUse() ) {
179  // Event sequences not in use, just return the original filename
180  return orgFileName;
181  }
182  std::string rangeID = currentRangeID();
183  std::lock_guard lockg( m_mutex );
184  if (!m_replaceRangeMode) {
185  // build the full output file name for this event range
186  std::string fileNameCore = orgFileName, fileNameExt;
187  std::size_t sepPos = orgFileName.find('[');
188  if (sepPos != std::string::npos) {
189  fileNameCore = orgFileName.substr(0, sepPos);
190  fileNameExt = orgFileName.substr(sepPos);
191  }
192  std::ostringstream n;
193  n << fileNameCore << "." << rangeID << fileNameExt;
194  m_lastFileName = n.str();
195  } else {
196  std::string_view origFileNameView = orgFileName;
197  std::size_t open = origFileNameView.find('[');
198  std::size_t close = origFileNameView.find(']');
199  // If we don't find a [ ] enclosed section, just append the rangeID to the
200  // end
201  if (open == std::string_view::npos || close == std::string_view::npos) {
202  m_lastFileName = std::format("{}.{}", origFileNameView, rangeID);
203  } else {
204  // build list of elems to substitute from
205  ATH_MSG_DEBUG("Building element list");
206  std::vector<std::string_view> elems{};
207  std::size_t pos = open + 1;
208  for (std::size_t comma = origFileNameView.find(',', pos);
209  comma < close;
210  comma = origFileNameView.find(',', pos)) {
211  std::string_view item = origFileNameView.substr(pos, comma - pos);
212  ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) comma = " << comma << ", item = " << item);
213  elems.push_back(item);
214  pos = comma + 1;
215  }
216  std::string_view last_item = origFileNameView.substr(pos, close - pos);
217  ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) close = " << close << ", item = " << last_item);
218  elems.push_back(last_item);
219  // substitute
220  std::size_t rangeIdx{};
221  auto rangeIdxParseRes = std::from_chars(
222  rangeID.data(), rangeID.data() + rangeID.size(), rangeIdx);
223  if (rangeIdxParseRes.ec != std::errc()) {
225  "Error parsing rangeID to integer. Replacing [] list with "
226  "rangeID.");
228  std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
229  origFileNameView.substr(close + 1));
230  } else if (rangeIdx >= elems.size()) {
232  "Number of elements in [] list <= rangeID. Replacing [] list with "
233  "rangeID.");
235  std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
236  origFileNameView.substr(close + 1));
237  } else {
239  "{}{}{}", origFileNameView.substr(0, open), elems.at(rangeIdx),
240  origFileNameView.substr(close + 1));
241  ATH_MSG_DEBUG("Output file: " << m_lastFileName);
242  }
243  }
244  }
245 
246  if( m_reportingOn.value() ) {
247  m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
248  }
249 
250  return m_lastFileName;
251 }

◆ currentRangeID()

std::string OutputStreamSequencerSvc::currentRangeID ( ) const

The current Event Range ID (only one range is returned)

Definition at line 254 of file OutputStreamSequencerSvc.cxx.

255 {
256  if( !inUse() ) return "";
257  auto slot = Gaudi::Hive::currentContext().slot();
258  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
259  std::lock_guard lockg( m_mutex );
260  if( slot >= m_rangeIDinSlot.size() ) return "";
261  return m_rangeIDinSlot[ slot ];
262 }

◆ finalize()

StatusCode OutputStreamSequencerSvc::finalize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 70 of file OutputStreamSequencerSvc.cxx.

70  {
71  // Release MetaDataSvc
72  if (!m_metaDataSvc.release().isSuccess()) {
73  ATH_MSG_WARNING("Cannot release MetaDataSvc.");
74  }
75  return(StatusCode::SUCCESS);
76 }

◆ getRangeReport()

OutputStreamSequencerSvc::RangeReport_ptr OutputStreamSequencerSvc::getRangeReport ( )

Definition at line 285 of file OutputStreamSequencerSvc.cxx.

286 {
288  if( !m_reportingOn.value() ) {
289  ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
290  } else {
291  std::lock_guard lockg( m_mutex );
292  if(m_finishedRange!=m_fnToRangeId.end()) {
293  report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
296  }
297  }
298  return report;
299 }

◆ handle()

void OutputStreamSequencerSvc::handle ( const Incident &  inc)
finaloverridevirtual

Incident service handle.

Definition at line 90 of file OutputStreamSequencerSvc.cxx.

91 {
92  auto slot = Gaudi::Hive::currentContext().slot();
93  bool has_context = ( slot != EventContext::INVALID_CONTEXT_ID );
94  // in AthenaSP there is no context so go with the first slot
95  if( !has_context ) slot = 0;
96  m_lastIncident = inc.type();
97  ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for slot=" << slot
98  << (!has_context? " NO event context":"") );
99 
100  if( inc.type() == incidentName() ) { // NextEventRange
101  std::string rangeID;
102  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
103  if (fileInc != nullptr) {
104  rangeID = fileInc->fileName();
105  // Handle BeginInputFile
106  if (inc.type() == IncidentType::BeginInputFile) {
107  rangeID = "INFILE";
108  }
110  "Requested (through incident) Next Event Range filename extension: "
111  << rangeID);
112  }
113 
114  if( rangeID == "dummy" ) {
115  if( not inConcurrentEventsMode() ) {
116  // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
117  // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
118  ATH_MSG_DEBUG("MetaData transition");
119  // immediate write and disconnect for ES, otherwise do it after Event write is done
120  bool disconnect { true };
121  std::lock_guard lockg( m_mutex );
122  if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
123  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
124  }
125  }
126  // exit now, wait for the next (real) incident that will start the next range
127  return;
128  }
129  {
130  // start a new range
131  std::lock_guard lockg( m_mutex );
133  if( rangeID.empty() ) {
134  std::ostringstream n;
135  n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
136  rangeID = n.str();
137  ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
138  }
139  else if (rangeID == "INFILE") {
141  }
142  if( slot >= m_rangeIDinSlot.size() ) {
143  // MN - late resize, is there a better place for it?
144  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
145  }
146  // from now on new events will use the new rangeID
147  m_currentRangeID = rangeID;
148  // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
149  if( not inConcurrentEventsMode() or has_context ) {
150  m_rangeIDinSlot[ slot ] = std::move(rangeID);
151  }
152  }
153  if( not inConcurrentEventsMode() and not fileInc ) {
154  // non-file incident case (filename=="") in regular SP LoopMgr
155  ATH_MSG_DEBUG("MetaData transition");
156  bool disconnect { false };
157  // MN: may not know the full filename yet, but that is only needed for disconnect==true
158  if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
159  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
160  }
161  }
162  }
163  else if( inc.type() == IncidentType::BeginProcessing ) {
164  // new event start - assing current rangeId to its slot
165  std::lock_guard lockg( m_mutex );
166  ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << slot);
167  // If this service is enabled but not getting NextRange incidents, need to resize here
168  if( slot >= m_rangeIDinSlot.size() ) {
169  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
170  }
172  }
173 }

◆ incidentName()

std::string OutputStreamSequencerSvc::incidentName ( ) const
inline

The name of the incident that starts a new event sequence.

Definition at line 59 of file OutputStreamSequencerSvc.h.

59 { return m_incidentName.value(); }

◆ inConcurrentEventsMode()

bool OutputStreamSequencerSvc::inConcurrentEventsMode ( )
static

Are there concurrent events? (threads>1)

Definition at line 79 of file OutputStreamSequencerSvc.cxx.

79  {
80  return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
81 }

◆ initialize()

StatusCode OutputStreamSequencerSvc::initialize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 35 of file OutputStreamSequencerSvc.cxx.

35  {
36  ATH_MSG_DEBUG("Initializing " << name());
37 
38  // Set to be listener for end of event
39  ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
40  if (!incsvc.retrieve().isSuccess()) {
41  ATH_MSG_FATAL("Cannot get IncidentSvc.");
42  return(StatusCode::FAILURE);
43  }
44  if( !incidentName().empty() ) {
45  incsvc->addListener(this, incidentName(), 100);
46  incsvc->addListener(this, IncidentType::BeginProcessing, 100);
47  ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
48  ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
49  // Retrieve MetaDataSvc
50  if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
51  ATH_MSG_ERROR("Cannot get MetaDataSvc");
52  return StatusCode::FAILURE;
53  }
54  }
55 
56  if( inConcurrentEventsMode() ) {
57  ATH_MSG_DEBUG("Concurrent events mode");
58  } else {
59  ATH_MSG_VERBOSE("Sequential events mode");
60  }
61 
62  // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet
63  // m_rangeIDinSlot.resize( );
64  std::lock_guard lockg( m_mutex );
66 
67  return(StatusCode::SUCCESS);
68 }

◆ inUse()

bool OutputStreamSequencerSvc::inUse ( ) const

Is the service in active use? (true after the first range incident is handled)

Definition at line 84 of file OutputStreamSequencerSvc.cxx.

84  {
85  std::lock_guard lockg( m_mutex );
86  return m_fileSequenceNumber >= 0;
87 }

◆ lastIncident()

const std::string& OutputStreamSequencerSvc::lastIncident ( )
inline

Last incident type that was handled.

Definition at line 74 of file OutputStreamSequencerSvc.h.

74 { return m_lastIncident; }

◆ publishRangeReport()

void OutputStreamSequencerSvc::publishRangeReport ( const std::string &  outputFile)

Definition at line 279 of file OutputStreamSequencerSvc.cxx.

280 {
281  std::lock_guard lockg( m_mutex );
283 }

◆ setRangeID()

std::string OutputStreamSequencerSvc::setRangeID ( const std::string &  rangeID)

set the RangeID (possibly temporarily) so the right Range Filename may be generated

Definition at line 265 of file OutputStreamSequencerSvc.cxx.

266 {
267  auto slot = Gaudi::Hive::currentContext().slot();
268  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
269  std::lock_guard lockg( m_mutex );
270  if( slot >= m_rangeIDinSlot.size() ) {
271  throw std::runtime_error("OutputStreamSequencer::setRangeID(): slot out of range");
272  }
273  std::string oldrange = m_rangeIDinSlot[ slot ];
274  m_rangeIDinSlot[ slot ] = rangeID;
275  return oldrange;
276 }

Member Data Documentation

◆ m_currentRangeID

std::string OutputStreamSequencerSvc::m_currentRangeID
private

Current EventRange ID constructed on the last NextRange incident.

Definition at line 83 of file OutputStreamSequencerSvc.h.

◆ m_fileSequenceNumber

int OutputStreamSequencerSvc::m_fileSequenceNumber {}
private

The event sequence number.

Definition at line 80 of file OutputStreamSequencerSvc.h.

◆ m_finishedRange

std::map<std::string,std::string>::iterator OutputStreamSequencerSvc::m_finishedRange {}
private

Definition at line 111 of file OutputStreamSequencerSvc.h.

◆ m_fnToRangeId

std::map<std::string,std::string> OutputStreamSequencerSvc::m_fnToRangeId
private

Definition at line 110 of file OutputStreamSequencerSvc.h.

◆ m_incidentName

StringProperty OutputStreamSequencerSvc::m_incidentName
private
Initial value:
{this, "SequenceIncidentName", "",
"Name of the incident that signals the next Event Range start" }

SequenceIncidentName, incident name for triggering file sequencing.

Definition at line 96 of file OutputStreamSequencerSvc.h.

◆ m_lastFileName

std::string OutputStreamSequencerSvc::m_lastFileName
private

Recently constructed full file name (useful in single threaded processing)

Definition at line 86 of file OutputStreamSequencerSvc.h.

◆ m_lastIncident

std::string OutputStreamSequencerSvc::m_lastIncident
private

Last incident type that was handled.

Definition at line 89 of file OutputStreamSequencerSvc.h.

◆ m_metaDataSvc

ServiceHandle<MetaDataSvc> OutputStreamSequencerSvc::m_metaDataSvc
private

Definition at line 77 of file OutputStreamSequencerSvc.h.

◆ m_mutex

std::mutex OutputStreamSequencerSvc::m_mutex
mutableprivate

Definition at line 113 of file OutputStreamSequencerSvc.h.

◆ m_rangeIDinSlot

std::vector<std::string> OutputStreamSequencerSvc::m_rangeIDinSlot
private

EventRange ID for all slots.

Definition at line 92 of file OutputStreamSequencerSvc.h.

◆ m_replaceRangeMode

BooleanProperty OutputStreamSequencerSvc::m_replaceRangeMode
private
Initial value:
{
this, "ReplaceRangeMode", false,
"If True, everything between [ and ] in the output filename is treated "
"as a comma-separated list, and the range_idth element of the list is "
"selected"}

Flag to put in ReplaceRangeMode (i.e.

everything between [ and ] (inclusive) is replaced with the range_idth element of that list)

Definition at line 104 of file OutputStreamSequencerSvc.h.

◆ m_reportingOn

BooleanProperty OutputStreamSequencerSvc::m_reportingOn
private
Initial value:
{this, "ReportingOn", false,
"If True, keep info about Ranges for getRangeReport() calls"}

Flag to switch on storage of reporting info in fnToRangeId.

Definition at line 99 of file OutputStreamSequencerSvc.h.


The documentation for this class was generated from the following files:
OutputStreamSequencerSvc::incidentName
std::string incidentName() const
The name of the incident that starts a new event sequence.
Definition: OutputStreamSequencerSvc.h:59
OutputStreamSequencerSvc::inConcurrentEventsMode
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1)
Definition: OutputStreamSequencerSvc.cxx:79
OutputStreamSequencerSvc::m_lastFileName
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing)
Definition: OutputStreamSequencerSvc.h:86
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
vtune_athena.format
format
Definition: vtune_athena.py:14
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
OutputStreamSequencerSvc::m_fnToRangeId
std::map< std::string, std::string > m_fnToRangeId
Definition: OutputStreamSequencerSvc.h:110
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
checkTP.report
report
Definition: checkTP.py:125
OutputStreamSequencerSvc::inUse
bool inUse() const
Is the service in active use? (true after the first range incident is handled)
Definition: OutputStreamSequencerSvc.cxx:84
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
OutputStreamSequencerSvc::m_mutex
std::mutex m_mutex
Definition: OutputStreamSequencerSvc.h:113
OutputStreamSequencerSvc::RangeReport_ptr
std::unique_ptr< RangeReport_t > RangeReport_ptr
Definition: OutputStreamSequencerSvc.h:35
compareGeometries.outputFile
string outputFile
Definition: compareGeometries.py:25
OutputStreamSequencerSvc::m_lastIncident
std::string m_lastIncident
Last incident type that was handled.
Definition: OutputStreamSequencerSvc.h:89
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
beamspotman.n
n
Definition: beamspotman.py:729
OutputStreamSequencerSvc::m_reportingOn
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
Definition: OutputStreamSequencerSvc.h:99
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OutputStreamSequencerSvc::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Definition: OutputStreamSequencerSvc.h:77
OutputStreamSequencerSvc::m_rangeIDinSlot
std::vector< std::string > m_rangeIDinSlot
EventRange ID for all slots.
Definition: OutputStreamSequencerSvc.h:92
OutputStreamSequencerSvc::currentRangeID
std::string currentRangeID() const
The current Event Range ID (only one range is returned)
Definition: OutputStreamSequencerSvc.cxx:254
OutputStreamSequencerSvc::m_currentRangeID
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
Definition: OutputStreamSequencerSvc.h:83
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
ActsTrk::to_string
std::string to_string(const DetectorType &type)
Definition: GeometryDefs.h:34
item
Definition: ItemListSvc.h:43
OutputStreamSequencerSvc::m_replaceRangeMode
BooleanProperty m_replaceRangeMode
Flag to put in ReplaceRangeMode (i.e.
Definition: OutputStreamSequencerSvc.h:104
python.LumiBlobConversion.pos
pos
Definition: LumiBlobConversion.py:16
OutputStreamSequencerSvc::m_fileSequenceNumber
int m_fileSequenceNumber
The event sequence number.
Definition: OutputStreamSequencerSvc.h:80
Trk::open
@ open
Definition: BinningType.h:40
OutputStreamSequencerSvc::m_finishedRange
std::map< std::string, std::string >::iterator m_finishedRange
Definition: OutputStreamSequencerSvc.h:111
columnar::empty
bool empty() const noexcept
Definition: ObjectRange.h:163
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OutputStreamSequencerSvc::m_incidentName
StringProperty m_incidentName
SequenceIncidentName, incident name for triggering file sequencing.
Definition: OutputStreamSequencerSvc.h:96
ServiceHandle< IIncidentSvc >