ATLAS Offline Software
Loading...
Searching...
No Matches
OutputStreamSequencerSvc Class Reference

This class provides configuration properties to enable OutputStream file sequences. More...

#include <OutputStreamSequencerSvc.h>

Inheritance diagram for OutputStreamSequencerSvc:
Collaboration diagram for OutputStreamSequencerSvc:

Public Types

typedef std::pair< std::string, std::string > RangeReport_t
typedef std::unique_ptr< RangeReport_tRangeReport_ptr

Public Member Functions

 OutputStreamSequencerSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~OutputStreamSequencerSvc ()
 Destructor.
virtual StatusCode initialize () override final
 Required of all Gaudi services:
virtual StatusCode finalize () override final
 Required of all Gaudi services:
virtual void handle (const Incident &) override final
 Incident service handle.
std::string buildSequenceFileName (const EventContext &ctx, const std::string &)
 Returns sequenced file name for output stream.
void publishRangeReport (const std::string &outputFile)
RangeReport_ptr getRangeReport ()
std::string incidentName () const
 The name of the incident that starts a new event sequence.
std::string currentRangeID (const EventContext &ctx) const
 The current Event Range ID (only one range is returned).
std::string setRangeID (const EventContext &ctx, const std::string &rangeID)
 set the RangeID (possibly temporarily) so the right Range Filename may be generated
bool inUse () const
 Is the service in active use? (true after the first range incident is handled).
const std::string & lastIncident ()
 Last incident type that was handled.

Static Public Member Functions

static bool inConcurrentEventsMode ()
 Are there concurrent events? (threads>1).

Private Attributes

ServiceHandle< MetaDataSvcm_metaDataSvc
int m_fileSequenceNumber {}
 The event sequence number.
std::string m_currentRangeID
 Current EventRange ID constructed on the last NextRange incident.
std::string m_lastFileName
 Recently constructed full file name (useful in single threaded processing).
std::string m_lastIncident
 Last incident type that was handled.
SG::SlotSpecificObj< std::string, SG::InvalidSlot::Enabledm_rangeIDinSlot
 EventRange ID for all slots.
StringProperty m_incidentName
 SequenceIncidentName, incident name for triggering file sequencing.
BooleanProperty m_reportingOn
 Flag to switch on storage of reporting info in fnToRangeId.
BooleanProperty m_replaceRangeMode
 Flag to put in ReplaceRangeMode (i.e.
std::map< std::string, std::string > m_fnToRangeId
std::map< std::string, std::string >::iterator m_finishedRange {}
std::mutex m_mutex

Detailed Description

This class provides configuration properties to enable OutputStream file sequences.

Definition at line 30 of file OutputStreamSequencerSvc.h.

Member Typedef Documentation

◆ RangeReport_ptr

Definition at line 36 of file OutputStreamSequencerSvc.h.

◆ RangeReport_t

typedef std::pair<std::string,std::string> OutputStreamSequencerSvc::RangeReport_t

Definition at line 35 of file OutputStreamSequencerSvc.h.

Constructor & Destructor Documentation

◆ OutputStreamSequencerSvc()

OutputStreamSequencerSvc::OutputStreamSequencerSvc ( const std::string & name,
ISvcLocator * pSvcLocator )

Standard Service Constructor.

Definition at line 24 of file OutputStreamSequencerSvc.cxx.

25 : base_class(name, pSvcLocator),
26 m_metaDataSvc("MetaDataSvc", name),
28{
29}
ServiceHandle< MetaDataSvc > m_metaDataSvc
int m_fileSequenceNumber
The event sequence number.

◆ ~OutputStreamSequencerSvc()

OutputStreamSequencerSvc::~OutputStreamSequencerSvc ( )
virtual

Destructor.

Definition at line 32 of file OutputStreamSequencerSvc.cxx.

32 {
33}

Member Function Documentation

◆ buildSequenceFileName()

std::string OutputStreamSequencerSvc::buildSequenceFileName ( const EventContext & ctx,
const std::string & orgFileName )

Returns sequenced file name for output stream.

Definition at line 161 of file OutputStreamSequencerSvc.cxx.

162{
163 if( !inUse() ) {
164 // Event sequences not in use, just return the original filename
165 return orgFileName;
166 }
167 std::string rangeID = currentRangeID(ctx);
168 std::lock_guard lockg( m_mutex );
169 if (!m_replaceRangeMode) {
170 // build the full output file name for this event range
171 std::string fileNameCore = orgFileName, fileNameExt;
172 std::size_t sepPos = orgFileName.find('[');
173 if (sepPos != std::string::npos) {
174 fileNameCore = orgFileName.substr(0, sepPos);
175 fileNameExt = orgFileName.substr(sepPos);
176 }
177 std::ostringstream n;
178 n << fileNameCore << "." << rangeID << fileNameExt;
179 m_lastFileName = n.str();
180 } else {
181 std::string_view origFileNameView = orgFileName;
182 std::size_t open = origFileNameView.find('[');
183 std::size_t close = origFileNameView.find(']');
184 // If we don't find a [ ] enclosed section, just append the rangeID to the
185 // end
186 if (open == std::string_view::npos || close == std::string_view::npos) {
187 m_lastFileName = std::format("{}.{}", origFileNameView, rangeID);
188 } else {
189 // build list of elems to substitute from
190 ATH_MSG_DEBUG("Building element list");
191 std::vector<std::string_view> elems{};
192 std::size_t pos = open + 1;
193 for (std::size_t comma = origFileNameView.find(',', pos);
194 comma < close;
195 comma = origFileNameView.find(',', pos)) {
196 std::string_view item = origFileNameView.substr(pos, comma - pos);
197 ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) comma = " << comma << ", item = " << item);
198 elems.push_back(item);
199 pos = comma + 1;
200 }
201 std::string_view last_item = origFileNameView.substr(pos, close - pos);
202 ATH_MSG_DEBUG("(start) pos = " << pos << ", (end) close = " << close << ", item = " << last_item);
203 elems.push_back(last_item);
204 // substitute
205 std::size_t rangeIdx{};
206 auto rangeIdxParseRes = std::from_chars(
207 rangeID.data(), rangeID.data() + rangeID.size(), rangeIdx);
208 if (rangeIdxParseRes.ec != std::errc()) {
210 "Error parsing rangeID to integer. Replacing [] list with "
211 "rangeID.");
213 std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
214 origFileNameView.substr(close + 1));
215 } else if (rangeIdx >= elems.size()) {
217 "Number of elements in [] list <= rangeID. Replacing [] list with "
218 "rangeID.");
220 std::format("{}{}{}", origFileNameView.substr(0, open), rangeID,
221 origFileNameView.substr(close + 1));
222 } else {
223 m_lastFileName = std::format(
224 "{}{}{}", origFileNameView.substr(0, open), elems.at(rangeIdx),
225 origFileNameView.substr(close + 1));
226 ATH_MSG_DEBUG("Output file: " << m_lastFileName);
227 }
228 }
229 }
230
231 if( m_reportingOn.value() ) {
232 m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
233 }
234
235 return m_lastFileName;
236}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
bool inUse() const
Is the service in active use? (true after the first range incident is handled).
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing).
std::string currentRangeID(const EventContext &ctx) const
The current Event Range ID (only one range is returned).
BooleanProperty m_replaceRangeMode
Flag to put in ReplaceRangeMode (i.e.
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
std::map< std::string, std::string > m_fnToRangeId
@ open
Definition BinningType.h:40

◆ currentRangeID()

std::string OutputStreamSequencerSvc::currentRangeID ( const EventContext & ctx) const

The current Event Range ID (only one range is returned).

Definition at line 239 of file OutputStreamSequencerSvc.cxx.

240{
241 if( !inUse() ) return "";
242 return *m_rangeIDinSlot.get(ctx);
243}
SG::SlotSpecificObj< std::string, SG::InvalidSlot::Enabled > m_rangeIDinSlot
EventRange ID for all slots.

◆ finalize()

StatusCode OutputStreamSequencerSvc::finalize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 67 of file OutputStreamSequencerSvc.cxx.

67 {
68 // Release MetaDataSvc
69 if (!m_metaDataSvc.release().isSuccess()) {
70 ATH_MSG_WARNING("Cannot release MetaDataSvc.");
71 }
72 return(StatusCode::SUCCESS);
73}

◆ getRangeReport()

OutputStreamSequencerSvc::RangeReport_ptr OutputStreamSequencerSvc::getRangeReport ( )

Definition at line 261 of file OutputStreamSequencerSvc.cxx.

262{
264 if( !m_reportingOn.value() ) {
265 ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
266 } else {
267 std::lock_guard lockg( m_mutex );
268 if(m_finishedRange!=m_fnToRangeId.end()) {
269 report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
272 }
273 }
274 return report;
275}
std::unique_ptr< RangeReport_t > RangeReport_ptr
std::map< std::string, std::string >::iterator m_finishedRange
list report
Definition checkTP.py:125

◆ handle()

void OutputStreamSequencerSvc::handle ( const Incident & inc)
finaloverridevirtual

Incident service handle.

Definition at line 87 of file OutputStreamSequencerSvc.cxx.

88{
89 const EventContext& ctx = inc.context();
90 m_lastIncident = inc.type();
91 ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for " << ctx);
92
93 if( inc.type() == incidentName() ) { // NextEventRange
94 std::string rangeID;
95 const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
96 if (fileInc != nullptr) {
97 rangeID = fileInc->fileName();
98 // Handle BeginInputFile
99 if (inc.type() == IncidentType::BeginInputFile) {
100 rangeID = "INFILE";
101 }
103 "Requested (through incident) Next Event Range filename extension: "
104 << rangeID);
105 }
106
107 if( rangeID == "dummy" ) {
108 if( not inConcurrentEventsMode() ) {
109 // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
110 // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
111 ATH_MSG_DEBUG("MetaData transition");
112 // immediate write and disconnect for ES, otherwise do it after Event write is done
113 bool disconnect { true };
114 std::lock_guard lockg( m_mutex );
115 if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
116 throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
117 }
118 }
119 // exit now, wait for the next (real) incident that will start the next range
120 return;
121 }
122 {
123 // start a new range
124 std::lock_guard lockg( m_mutex );
126 if( rangeID.empty() ) {
127 std::ostringstream n;
128 n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
129 rangeID = n.str();
130 ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
131 }
132 else if (rangeID == "INFILE") {
133 rangeID = std::to_string(m_fileSequenceNumber);
134 }
135 // from now on new events will use the new rangeID
136 m_currentRangeID = rangeID;
137 // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
138 if( not inConcurrentEventsMode() or ctx.valid() ) {
139 *m_rangeIDinSlot.get(ctx) = std::move(rangeID);
140 }
141 }
142 if( not inConcurrentEventsMode() and not fileInc ) {
143 // non-file incident case (filename=="") in regular SP LoopMgr
144 ATH_MSG_DEBUG("MetaData transition");
145 bool disconnect { false };
146 // MN: may not know the full filename yet, but that is only needed for disconnect==true
147 if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
148 throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
149 }
150 }
151 }
152 else if( inc.type() == IncidentType::BeginProcessing ) {
153 // new event start - assing current rangeId to its slot
154 std::lock_guard lockg( m_mutex );
155 ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << ctx.slot());
157 }
158}
#define ATH_MSG_INFO(x)
std::string incidentName() const
The name of the incident that starts a new event sequence.
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1).
std::string m_lastIncident
Last incident type that was handled.

◆ incidentName()

std::string OutputStreamSequencerSvc::incidentName ( ) const
inline

The name of the incident that starts a new event sequence.

Definition at line 60 of file OutputStreamSequencerSvc.h.

60{ return m_incidentName.value(); }
StringProperty m_incidentName
SequenceIncidentName, incident name for triggering file sequencing.

◆ inConcurrentEventsMode()

bool OutputStreamSequencerSvc::inConcurrentEventsMode ( )
static

Are there concurrent events? (threads>1).

Definition at line 76 of file OutputStreamSequencerSvc.cxx.

76 {
77 return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
78}

◆ initialize()

StatusCode OutputStreamSequencerSvc::initialize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 35 of file OutputStreamSequencerSvc.cxx.

35 {
36 ATH_MSG_DEBUG("Initializing " << name());
37
38 // Set to be listener for end of event
39 ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
40 if (!incsvc.retrieve().isSuccess()) {
41 ATH_MSG_FATAL("Cannot get IncidentSvc.");
42 return(StatusCode::FAILURE);
43 }
44 if( !incidentName().empty() ) {
45 incsvc->addListener(this, incidentName(), 100);
46 incsvc->addListener(this, IncidentType::BeginProcessing, 100);
47 ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
48 ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
49 // Retrieve MetaDataSvc
50 if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
51 ATH_MSG_ERROR("Cannot get MetaDataSvc");
52 return StatusCode::FAILURE;
53 }
54 }
55
57 ATH_MSG_DEBUG("Concurrent events mode");
58 } else {
59 ATH_MSG_VERBOSE("Sequential events mode");
60 }
61
63
64 return(StatusCode::SUCCESS);
65}
#define ATH_MSG_FATAL(x)
#define ATH_MSG_VERBOSE(x)
static const Attributes_t empty

◆ inUse()

bool OutputStreamSequencerSvc::inUse ( ) const

Is the service in active use? (true after the first range incident is handled).

Definition at line 81 of file OutputStreamSequencerSvc.cxx.

81 {
82 std::lock_guard lockg( m_mutex );
83 return m_fileSequenceNumber >= 0;
84}

◆ lastIncident()

const std::string & OutputStreamSequencerSvc::lastIncident ( )
inline

Last incident type that was handled.

Definition at line 75 of file OutputStreamSequencerSvc.h.

75{ return m_lastIncident; }

◆ publishRangeReport()

void OutputStreamSequencerSvc::publishRangeReport ( const std::string & outputFile)

Definition at line 255 of file OutputStreamSequencerSvc.cxx.

256{
257 std::lock_guard lockg( m_mutex );
258 m_finishedRange = m_fnToRangeId.find(outputFile);
259}

◆ setRangeID()

std::string OutputStreamSequencerSvc::setRangeID ( const EventContext & ctx,
const std::string & rangeID )

set the RangeID (possibly temporarily) so the right Range Filename may be generated

Definition at line 246 of file OutputStreamSequencerSvc.cxx.

247{
248 std::string* rangeid = m_rangeIDinSlot.get(ctx);
249 const std::string oldrange = *rangeid;
250 *rangeid = rangeID;
251 return oldrange;
252}

Member Data Documentation

◆ m_currentRangeID

std::string OutputStreamSequencerSvc::m_currentRangeID
private

Current EventRange ID constructed on the last NextRange incident.

Definition at line 84 of file OutputStreamSequencerSvc.h.

◆ m_fileSequenceNumber

int OutputStreamSequencerSvc::m_fileSequenceNumber {}
private

The event sequence number.

Definition at line 81 of file OutputStreamSequencerSvc.h.

81{};

◆ m_finishedRange

std::map<std::string,std::string>::iterator OutputStreamSequencerSvc::m_finishedRange {}
private

Definition at line 112 of file OutputStreamSequencerSvc.h.

112{};

◆ m_fnToRangeId

std::map<std::string,std::string> OutputStreamSequencerSvc::m_fnToRangeId
private

Definition at line 111 of file OutputStreamSequencerSvc.h.

◆ m_incidentName

StringProperty OutputStreamSequencerSvc::m_incidentName
private
Initial value:
{this, "SequenceIncidentName", "",
"Name of the incident that signals the next Event Range start" }

SequenceIncidentName, incident name for triggering file sequencing.

Definition at line 97 of file OutputStreamSequencerSvc.h.

97 {this, "SequenceIncidentName", "",
98 "Name of the incident that signals the next Event Range start" };

◆ m_lastFileName

std::string OutputStreamSequencerSvc::m_lastFileName
private

Recently constructed full file name (useful in single threaded processing).

Definition at line 87 of file OutputStreamSequencerSvc.h.

◆ m_lastIncident

std::string OutputStreamSequencerSvc::m_lastIncident
private

Last incident type that was handled.

Definition at line 90 of file OutputStreamSequencerSvc.h.

◆ m_metaDataSvc

ServiceHandle<MetaDataSvc> OutputStreamSequencerSvc::m_metaDataSvc
private

Definition at line 78 of file OutputStreamSequencerSvc.h.

◆ m_mutex

std::mutex OutputStreamSequencerSvc::m_mutex
mutableprivate

Definition at line 114 of file OutputStreamSequencerSvc.h.

◆ m_rangeIDinSlot

SG::SlotSpecificObj<std::string, SG::InvalidSlot::Enabled> OutputStreamSequencerSvc::m_rangeIDinSlot
private

EventRange ID for all slots.

Definition at line 93 of file OutputStreamSequencerSvc.h.

◆ m_replaceRangeMode

BooleanProperty OutputStreamSequencerSvc::m_replaceRangeMode
private
Initial value:
{
this, "ReplaceRangeMode", false,
"If True, everything between [ and ] in the output filename is treated "
"as a comma-separated list, and the range_idth element of the list is "
"selected"}

Flag to put in ReplaceRangeMode (i.e.

everything between [ and ] (inclusive) is replaced with the range_idth element of that list)

Definition at line 105 of file OutputStreamSequencerSvc.h.

105 {
106 this, "ReplaceRangeMode", false,
107 "If True, everything between [ and ] in the output filename is treated "
108 "as a comma-separated list, and the range_idth element of the list is "
109 "selected"};

◆ m_reportingOn

BooleanProperty OutputStreamSequencerSvc::m_reportingOn
private
Initial value:
{this, "ReportingOn", false,
"If True, keep info about Ranges for getRangeReport() calls"}

Flag to switch on storage of reporting info in fnToRangeId.

Definition at line 100 of file OutputStreamSequencerSvc.h.

100 {this, "ReportingOn", false,
101 "If True, keep info about Ranges for getRangeReport() calls"};

The documentation for this class was generated from the following files: