ATLAS Offline Software
Public Types | Public Member Functions | Static Public Member Functions | Private Attributes | List of all members
OutputStreamSequencerSvc Class Reference

This class provides configuration properties to enable OutputStream file sequences. More...

#include <OutputStreamSequencerSvc.h>

Inheritance diagram for OutputStreamSequencerSvc:
Collaboration diagram for OutputStreamSequencerSvc:

Public Types

typedef std::pair< std::string, std::string > RangeReport_t
 
typedef std::unique_ptr< RangeReport_tRangeReport_ptr
 

Public Member Functions

 OutputStreamSequencerSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor. More...
 
virtual ~OutputStreamSequencerSvc ()
 Destructor. More...
 
virtual StatusCode initialize () override final
 Required of all Gaudi services: More...
 
virtual StatusCode finalize () override final
 Required of all Gaudi services: More...
 
virtual void handle (const Incident &) override final
 Incident service handle. More...
 
std::string buildSequenceFileName (const std::string &)
 Returns sequenced file name for output stream. More...
 
void publishRangeReport (const std::string &outputFile)
 
RangeReport_ptr getRangeReport ()
 
std::string incidentName () const
 The name of the incident that starts a new event sequence. More...
 
std::string currentRangeID () const
 The current Event Range ID (only one range is returned) More...
 
std::string setRangeID (const std::string &rangeID)
 set the RangeID (possibly temporarily) so the right Range Filename may be generated More...
 
bool inUse () const
 Is the service in active use? (true after the first range incident is handled) More...
 
const std::string & lastIncident ()
 Last incident type that was handled. More...
 

Static Public Member Functions

static bool inConcurrentEventsMode ()
 Are there concurrent events? (threads>1) More...
 

Private Attributes

ServiceHandle< MetaDataSvcm_metaDataSvc
 
int m_fileSequenceNumber
 The event sequence number. More...
 
std::string m_currentRangeID
 Current EventRange ID constructed on the last NextRange incident. More...
 
std::string m_lastFileName
 Recently constructed full file name (useful in single threaded processing) More...
 
std::string m_lastIncident
 Last incident type that was handled. More...
 
std::vector< std::string > m_rangeIDinSlot
 EventRange ID for all slots. More...
 
StringProperty m_incidentName
 SequenceIncidentName, incident name for triggering file sequencing. More...
 
BooleanProperty m_reportingOn
 Flag to switch on storage of reporting info in fnToRangeId. More...
 
std::map< std::string, std::string > m_fnToRangeId
 
std::map< std::string, std::string >::iterator m_finishedRange
 
std::mutex m_mutex
 

Detailed Description

This class provides configuration properties to enable OutputStream file sequences.

Definition at line 29 of file OutputStreamSequencerSvc.h.

Member Typedef Documentation

◆ RangeReport_ptr

Definition at line 35 of file OutputStreamSequencerSvc.h.

◆ RangeReport_t

typedef std::pair<std::string,std::string> OutputStreamSequencerSvc::RangeReport_t

Definition at line 34 of file OutputStreamSequencerSvc.h.

Constructor & Destructor Documentation

◆ OutputStreamSequencerSvc()

OutputStreamSequencerSvc::OutputStreamSequencerSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Standard Service Constructor.

Definition at line 20 of file OutputStreamSequencerSvc.cxx.

21  : base_class(name, pSvcLocator),
22  m_metaDataSvc("MetaDataSvc", name),
24 {
25 }

◆ ~OutputStreamSequencerSvc()

OutputStreamSequencerSvc::~OutputStreamSequencerSvc ( )
virtual

Destructor.

Definition at line 28 of file OutputStreamSequencerSvc.cxx.

28  {
29 }

Member Function Documentation

◆ buildSequenceFileName()

std::string OutputStreamSequencerSvc::buildSequenceFileName ( const std::string &  orgFileName)

Returns sequenced file name for output stream.

Definition at line 160 of file OutputStreamSequencerSvc.cxx.

161 {
162  if( !inUse() ) {
163  // Event sequences not in use, just return the original filename
164  return orgFileName;
165  }
166  std::string rangeID = currentRangeID();
167  std::lock_guard lockg( m_mutex );
168  // build the full output file name for this event range
169  std::string fileNameCore = orgFileName, fileNameExt;
170  std::size_t sepPos = orgFileName.find('[');
171  if (sepPos != std::string::npos) {
172  fileNameCore = orgFileName.substr(0, sepPos);
173  fileNameExt = orgFileName.substr(sepPos);
174  }
175  std::ostringstream n;
176  n << fileNameCore << "." << rangeID << fileNameExt;
177  m_lastFileName = n.str();
178 
179  if( m_reportingOn.value() ) {
180  m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
181  }
182 
183  return m_lastFileName;
184 }

◆ currentRangeID()

std::string OutputStreamSequencerSvc::currentRangeID ( ) const

The current Event Range ID (only one range is returned)

Definition at line 187 of file OutputStreamSequencerSvc.cxx.

188 {
189  if( !inUse() ) return "";
190  auto slot = Gaudi::Hive::currentContext().slot();
191  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
192  std::lock_guard lockg( m_mutex );
193  if( slot >= m_rangeIDinSlot.size() ) return "";
194  return m_rangeIDinSlot[ slot ];
195 }

◆ finalize()

StatusCode OutputStreamSequencerSvc::finalize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 65 of file OutputStreamSequencerSvc.cxx.

65  {
66  // Release MetaDataSvc
67  if (!m_metaDataSvc.release().isSuccess()) {
68  ATH_MSG_WARNING("Cannot release MetaDataSvc.");
69  }
70  return(StatusCode::SUCCESS);
71 }

◆ getRangeReport()

OutputStreamSequencerSvc::RangeReport_ptr OutputStreamSequencerSvc::getRangeReport ( )

Definition at line 218 of file OutputStreamSequencerSvc.cxx.

219 {
221  if( !m_reportingOn.value() ) {
222  ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
223  } else {
224  std::lock_guard lockg( m_mutex );
225  if(m_finishedRange!=m_fnToRangeId.end()) {
226  report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
229  }
230  }
231  return report;
232 }

◆ handle()

void OutputStreamSequencerSvc::handle ( const Incident &  inc)
finaloverridevirtual

Incident service handle.

Definition at line 84 of file OutputStreamSequencerSvc.cxx.

85 {
86  auto slot = Gaudi::Hive::currentContext().slot();
87  bool has_context = ( slot != EventContext::INVALID_CONTEXT_ID );
88  // in AthenaSP there is no context so go with the first slot
89  if( !has_context ) slot = 0;
90  m_lastIncident = inc.type();
91  ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for slot=" << slot
92  << (!has_context? " NO event context":"") );
93 
94  if( inc.type() == incidentName() ) { // NextEventRange
95  std::string rangeID;
96  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
97  if (fileInc != nullptr) {
98  rangeID = fileInc->fileName();
99  ATH_MSG_DEBUG("Requested (through incident) Next Event Range filename extension: " << rangeID);
100  }
101 
102  if( rangeID == "dummy" ) {
103  if( not inConcurrentEventsMode() ) {
104  // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
105  // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
106  ATH_MSG_DEBUG("MetaData transition");
107  // immediate write and disconnect for ES, otherwise do it after Event write is done
108  bool disconnect { true };
109  if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
110  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
111  }
112  }
113  // exit now, wait for the next (real) incident that will start the next range
114  return;
115  }
116  {
117  // start a new range
118  std::lock_guard lockg( m_mutex );
120  if( rangeID.empty() ) {
121  std::ostringstream n;
122  n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
123  rangeID = n.str();
124  ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
125  }
126  if( slot >= m_rangeIDinSlot.size() ) {
127  // MN - late resize, is there a better place for it?
128  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
129  }
130  // from now on new events will use the new rangeID
131  m_currentRangeID = rangeID;
132  // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
133  if( not inConcurrentEventsMode() or has_context ) {
134  m_rangeIDinSlot[ slot ] = rangeID;
135  }
136  }
137  if( not inConcurrentEventsMode() and not fileInc ) {
138  // non-file incident case (filename=="") in regular SP LoopMgr
139  ATH_MSG_DEBUG("MetaData transition");
140  bool disconnect { false };
141  // MN: may not know the full filename yet, but that is only needed for disconnect==true
142  if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
143  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
144  }
145  }
146  }
147  else if( inc.type() == IncidentType::BeginProcessing ) {
148  // new event start - assing current rangeId to its slot
149  ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << slot);
150  std::lock_guard lockg( m_mutex );
151  // If this service is enabled but not getting NextRange incidents, need to resize here
152  if( slot >= m_rangeIDinSlot.size() ) {
153  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
154  }
156  }
157 }

◆ incidentName()

std::string OutputStreamSequencerSvc::incidentName ( ) const
inline

The name of the incident that starts a new event sequence.

Definition at line 59 of file OutputStreamSequencerSvc.h.

59 { return m_incidentName.value(); }

◆ inConcurrentEventsMode()

bool OutputStreamSequencerSvc::inConcurrentEventsMode ( )
static

Are there concurrent events? (threads>1)

Definition at line 74 of file OutputStreamSequencerSvc.cxx.

74  {
75  return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
76 }

◆ initialize()

StatusCode OutputStreamSequencerSvc::initialize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 31 of file OutputStreamSequencerSvc.cxx.

31  {
32  ATH_MSG_DEBUG("Initializing " << name());
33 
34  // Set to be listener for end of event
35  ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
36  if (!incsvc.retrieve().isSuccess()) {
37  ATH_MSG_FATAL("Cannot get IncidentSvc.");
38  return(StatusCode::FAILURE);
39  }
40  if( !incidentName().empty() ) {
41  incsvc->addListener(this, incidentName(), 100);
42  incsvc->addListener(this, IncidentType::BeginProcessing, 100);
43  ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
44  ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
45  // Retrieve MetaDataSvc
46  if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
47  ATH_MSG_ERROR("Cannot get MetaDataSvc");
48  return StatusCode::FAILURE;
49  }
50  }
51 
52  if( inConcurrentEventsMode() ) {
53  ATH_MSG_DEBUG("Concurrent events mode");
54  } else {
55  ATH_MSG_VERBOSE("Sequential events mode");
56  }
57 
58  // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet
59  // m_rangeIDinSlot.resize( );
61 
62  return(StatusCode::SUCCESS);
63 }

◆ inUse()

bool OutputStreamSequencerSvc::inUse ( ) const

Is the service in active use? (true after the first range incident is handled)

Definition at line 79 of file OutputStreamSequencerSvc.cxx.

79  {
80  return m_fileSequenceNumber >= 0;
81 }

◆ lastIncident()

const std::string& OutputStreamSequencerSvc::lastIncident ( )
inline

Last incident type that was handled.

Definition at line 74 of file OutputStreamSequencerSvc.h.

74 { return m_lastIncident; }

◆ publishRangeReport()

void OutputStreamSequencerSvc::publishRangeReport ( const std::string &  outputFile)

Definition at line 212 of file OutputStreamSequencerSvc.cxx.

213 {
214  std::lock_guard lockg( m_mutex );
216 }

◆ setRangeID()

std::string OutputStreamSequencerSvc::setRangeID ( const std::string &  rangeID)

set the RangeID (possibly temporarily) so the right Range Filename may be generated

Definition at line 198 of file OutputStreamSequencerSvc.cxx.

199 {
200  auto slot = Gaudi::Hive::currentContext().slot();
201  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
202  std::lock_guard lockg( m_mutex );
203  if( slot >= m_rangeIDinSlot.size() ) {
204  throw std::runtime_error("OutputStreamSequencer::setRangeID(): slot out of range");
205  }
206  std::string oldrange = m_rangeIDinSlot[ slot ];
207  m_rangeIDinSlot[ slot ] = rangeID;
208  return oldrange;
209 }

Member Data Documentation

◆ m_currentRangeID

std::string OutputStreamSequencerSvc::m_currentRangeID
private

Current EventRange ID constructed on the last NextRange incident.

Definition at line 83 of file OutputStreamSequencerSvc.h.

◆ m_fileSequenceNumber

int OutputStreamSequencerSvc::m_fileSequenceNumber
private

The event sequence number.

Definition at line 80 of file OutputStreamSequencerSvc.h.

◆ m_finishedRange

std::map<std::string,std::string>::iterator OutputStreamSequencerSvc::m_finishedRange
private

Definition at line 103 of file OutputStreamSequencerSvc.h.

◆ m_fnToRangeId

std::map<std::string,std::string> OutputStreamSequencerSvc::m_fnToRangeId
private

Definition at line 102 of file OutputStreamSequencerSvc.h.

◆ m_incidentName

StringProperty OutputStreamSequencerSvc::m_incidentName
private
Initial value:
{this, "SequenceIncidentName", "",
"Name of the incident that signals the next Event Range start" }

SequenceIncidentName, incident name for triggering file sequencing.

Definition at line 96 of file OutputStreamSequencerSvc.h.

◆ m_lastFileName

std::string OutputStreamSequencerSvc::m_lastFileName
private

Recently constructed full file name (useful in single threaded processing)

Definition at line 86 of file OutputStreamSequencerSvc.h.

◆ m_lastIncident

std::string OutputStreamSequencerSvc::m_lastIncident
private

Last incident type that was handled.

Definition at line 89 of file OutputStreamSequencerSvc.h.

◆ m_metaDataSvc

ServiceHandle<MetaDataSvc> OutputStreamSequencerSvc::m_metaDataSvc
private

Definition at line 77 of file OutputStreamSequencerSvc.h.

◆ m_mutex

std::mutex OutputStreamSequencerSvc::m_mutex
mutableprivate

Definition at line 105 of file OutputStreamSequencerSvc.h.

◆ m_rangeIDinSlot

std::vector<std::string> OutputStreamSequencerSvc::m_rangeIDinSlot
private

EventRange ID for all slots.

Definition at line 92 of file OutputStreamSequencerSvc.h.

◆ m_reportingOn

BooleanProperty OutputStreamSequencerSvc::m_reportingOn
private
Initial value:
{this, "ReportingOn", false,
"If True, keep info about Ranges for getRangeReport() calls"}

Flag to switch on storage of reporting info in fnToRangeId.

Definition at line 99 of file OutputStreamSequencerSvc.h.


The documentation for this class was generated from the following files:
OutputStreamSequencerSvc::incidentName
std::string incidentName() const
The name of the incident that starts a new event sequence.
Definition: OutputStreamSequencerSvc.h:59
OutputStreamSequencerSvc::inConcurrentEventsMode
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1)
Definition: OutputStreamSequencerSvc.cxx:74
OutputStreamSequencerSvc::m_lastFileName
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing)
Definition: OutputStreamSequencerSvc.h:86
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
OutputStreamSequencerSvc::m_fnToRangeId
std::map< std::string, std::string > m_fnToRangeId
Definition: OutputStreamSequencerSvc.h:102
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
checkTP.report
report
Definition: checkTP.py:127
OutputStreamSequencerSvc::inUse
bool inUse() const
Is the service in active use? (true after the first range incident is handled)
Definition: OutputStreamSequencerSvc.cxx:79
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
empty
bool empty(TH1 *h)
Definition: computils.cxx:295
OutputStreamSequencerSvc::m_mutex
std::mutex m_mutex
Definition: OutputStreamSequencerSvc.h:105
OutputStreamSequencerSvc::RangeReport_ptr
std::unique_ptr< RangeReport_t > RangeReport_ptr
Definition: OutputStreamSequencerSvc.h:35
compareGeometries.outputFile
string outputFile
Definition: compareGeometries.py:25
OutputStreamSequencerSvc::m_lastIncident
std::string m_lastIncident
Last incident type that was handled.
Definition: OutputStreamSequencerSvc.h:89
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
beamspotman.n
n
Definition: beamspotman.py:731
OutputStreamSequencerSvc::m_reportingOn
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
Definition: OutputStreamSequencerSvc.h:99
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OutputStreamSequencerSvc::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Definition: OutputStreamSequencerSvc.h:77
OutputStreamSequencerSvc::m_rangeIDinSlot
std::vector< std::string > m_rangeIDinSlot
EventRange ID for all slots.
Definition: OutputStreamSequencerSvc.h:92
OutputStreamSequencerSvc::currentRangeID
std::string currentRangeID() const
The current Event Range ID (only one range is returned)
Definition: OutputStreamSequencerSvc.cxx:187
OutputStreamSequencerSvc::m_currentRangeID
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
Definition: OutputStreamSequencerSvc.h:83
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
OutputStreamSequencerSvc::m_fileSequenceNumber
int m_fileSequenceNumber
The event sequence number.
Definition: OutputStreamSequencerSvc.h:80
OutputStreamSequencerSvc::m_finishedRange
std::map< std::string, std::string >::iterator m_finishedRange
Definition: OutputStreamSequencerSvc.h:103
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OutputStreamSequencerSvc::m_incidentName
StringProperty m_incidentName
SequenceIncidentName, incident name for triggering file sequencing.
Definition: OutputStreamSequencerSvc.h:96
ServiceHandle< IIncidentSvc >