ATLAS Offline Software
Public Types | Public Member Functions | Static Public Member Functions | Private Attributes | List of all members
OutputStreamSequencerSvc Class Reference

This class provides configuration properties to enable OutputStream file sequences. More...

#include <OutputStreamSequencerSvc.h>

Inheritance diagram for OutputStreamSequencerSvc:
Collaboration diagram for OutputStreamSequencerSvc:

Public Types

typedef std::pair< std::string, std::string > RangeReport_t
 
typedef std::unique_ptr< RangeReport_tRangeReport_ptr
 

Public Member Functions

 OutputStreamSequencerSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor. More...
 
virtual ~OutputStreamSequencerSvc ()
 Destructor. More...
 
virtual StatusCode initialize () override final
 Required of all Gaudi services: More...
 
virtual StatusCode finalize () override final
 Required of all Gaudi services: More...
 
virtual StatusCode queryInterface (const InterfaceID &riid, void **ppvInterface) override final
 Required of all Gaudi services: see Gaudi documentation for details. More...
 
virtual void handle (const Incident &) override final
 Incident service handle. More...
 
std::string buildSequenceFileName (const std::string &)
 Returns sequenced file name for output stream. More...
 
void publishRangeReport (const std::string &outputFile)
 
RangeReport_ptr getRangeReport ()
 
std::string incidentName () const
 The name of the incident that starts a new event sequence. More...
 
std::string currentRangeID () const
 The current Event Range ID (only one range is returned) More...
 
std::string setRangeID (const std::string &rangeID)
 set the RangeID (possibly temporarily) so the right Range Filename may be generated More...
 
bool inUse () const
 Is the service in active use? (true after the first range incident is handled) More...
 
const std::string & lastIncident ()
 Last incident type that was handled. More...
 
MsgStream & msg () const
 
MsgStream & msg (const MSG::Level lvl) const
 
bool msgLvl (const MSG::Level lvl) const
 

Static Public Member Functions

static const InterfaceID & interfaceID ()
 Retrieve interface ID. More...
 
static bool inConcurrentEventsMode ()
 Are there concurrent events? (threads>1) More...
 

Private Attributes

ServiceHandle< MetaDataSvcm_metaDataSvc
 
int m_fileSequenceNumber
 The event sequence number. More...
 
std::string m_currentRangeID
 Current EventRange ID constructed on the last NextRange incident. More...
 
std::string m_lastFileName
 Recently constructed full file name (useful in single threaded processing) More...
 
std::string m_lastIncident
 Last incident type that was handled. More...
 
std::vector< std::string > m_rangeIDinSlot
 EventRange ID for all slots. More...
 
StringProperty m_incidentName
 SequenceIncidentName, incident name for triggering file sequencing. More...
 
BooleanProperty m_reportingOn
 Flag to switch on storage of reporting info in fnToRangeId. More...
 
std::map< std::string, std::string > m_fnToRangeId
 
std::map< std::string, std::string >::iterator m_finishedRange
 
std::mutex m_mutex
 

Detailed Description

This class provides configuration properties to enable OutputStream file sequences.

Definition at line 32 of file OutputStreamSequencerSvc.h.

Member Typedef Documentation

◆ RangeReport_ptr

Definition at line 37 of file OutputStreamSequencerSvc.h.

◆ RangeReport_t

typedef std::pair<std::string,std::string> OutputStreamSequencerSvc::RangeReport_t

Definition at line 36 of file OutputStreamSequencerSvc.h.

Constructor & Destructor Documentation

◆ OutputStreamSequencerSvc()

OutputStreamSequencerSvc::OutputStreamSequencerSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Standard Service Constructor.

Definition at line 20 of file OutputStreamSequencerSvc.cxx.

20  : ::AthService(name, pSvcLocator),
21  m_metaDataSvc("MetaDataSvc", name),
23 {
24 }

◆ ~OutputStreamSequencerSvc()

OutputStreamSequencerSvc::~OutputStreamSequencerSvc ( )
virtual

Destructor.

Definition at line 27 of file OutputStreamSequencerSvc.cxx.

27  {
28 }

Member Function Documentation

◆ buildSequenceFileName()

std::string OutputStreamSequencerSvc::buildSequenceFileName ( const std::string &  orgFileName)

Returns sequenced file name for output stream.

Definition at line 170 of file OutputStreamSequencerSvc.cxx.

171 {
172  if( !inUse() ) {
173  // Event sequences not in use, just return the original filename
174  return orgFileName;
175  }
176  std::string rangeID = currentRangeID();
177  std::lock_guard lockg( m_mutex );
178  // build the full output file name for this event range
179  std::string fileNameCore = orgFileName, fileNameExt;
180  std::size_t sepPos = orgFileName.find('[');
181  if (sepPos != std::string::npos) {
182  fileNameCore = orgFileName.substr(0, sepPos);
183  fileNameExt = orgFileName.substr(sepPos);
184  }
185  std::ostringstream n;
186  n << fileNameCore << "." << rangeID << fileNameExt;
187  m_lastFileName = n.str();
188 
189  if( m_reportingOn.value() ) {
190  m_fnToRangeId.insert( std::pair(m_lastFileName, rangeID) );
191  }
192 
193  return m_lastFileName;
194 }

◆ currentRangeID()

std::string OutputStreamSequencerSvc::currentRangeID ( ) const

The current Event Range ID (only one range is returned)

Definition at line 197 of file OutputStreamSequencerSvc.cxx.

198 {
199  if( !inUse() ) return "";
200  auto slot = Gaudi::Hive::currentContext().slot();
201  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
202  std::lock_guard lockg( m_mutex );
203  if( slot >= m_rangeIDinSlot.size() ) return "";
204  return m_rangeIDinSlot[ slot ];
205 }

◆ finalize()

StatusCode OutputStreamSequencerSvc::finalize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 64 of file OutputStreamSequencerSvc.cxx.

64  {
65  // Release MetaDataSvc
66  if (!m_metaDataSvc.release().isSuccess()) {
67  ATH_MSG_WARNING("Cannot release MetaDataSvc.");
68  }
69  return(StatusCode::SUCCESS);
70 }

◆ getRangeReport()

OutputStreamSequencerSvc::RangeReport_ptr OutputStreamSequencerSvc::getRangeReport ( )

Definition at line 228 of file OutputStreamSequencerSvc.cxx.

229 {
231  if( !m_reportingOn.value() ) {
232  ATH_MSG_WARNING("Reporting not turned on - set " << m_reportingOn.name() << " to True");
233  } else {
234  std::lock_guard lockg( m_mutex );
235  if(m_finishedRange!=m_fnToRangeId.end()) {
236  report = std::make_unique<RangeReport_t>(m_finishedRange->second,m_finishedRange->first);
239  }
240  }
241  return report;
242 }

◆ handle()

void OutputStreamSequencerSvc::handle ( const Incident &  inc)
finaloverridevirtual

Incident service handle.

Definition at line 94 of file OutputStreamSequencerSvc.cxx.

95 {
96  auto slot = Gaudi::Hive::currentContext().slot();
97  bool has_context = ( slot != EventContext::INVALID_CONTEXT_ID );
98  // in AthenaSP there is no context so go with the first slot
99  if( !has_context ) slot = 0;
100  m_lastIncident = inc.type();
101  ATH_MSG_INFO("Handling incident of type " << m_lastIncident << " for slot=" << slot
102  << (!has_context? " NO event context":"") );
103 
104  if( inc.type() == incidentName() ) { // NextEventRange
105  std::string rangeID;
106  const FileIncident* fileInc = dynamic_cast<const FileIncident*>(&inc);
107  if (fileInc != nullptr) {
108  rangeID = fileInc->fileName();
109  ATH_MSG_DEBUG("Requested (through incident) Next Event Range filename extension: " << rangeID);
110  }
111 
112  if( rangeID == "dummy" ) {
113  if( not inConcurrentEventsMode() ) {
114  // finish the previous Range here only in SEQUENTIAL (threads<2) event processing
115  // Write metadata on the incident finishing a Range (filename=="dummy") in ES MP
116  ATH_MSG_DEBUG("MetaData transition");
117  // immediate write and disconnect for ES, otherwise do it after Event write is done
118  bool disconnect { true };
119  if( !m_metaDataSvc->transitionMetaDataFile( m_lastFileName, disconnect ).isSuccess() ) {
120  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
121  }
122  }
123  // exit now, wait for the next (real) incident that will start the next range
124  return;
125  }
126  {
127  // start a new range
128  std::lock_guard lockg( m_mutex );
130  if( rangeID.empty() ) {
131  std::ostringstream n;
132  n << "_" << std::setw(4) << std::setfill('0') << m_fileSequenceNumber;
133  rangeID = n.str();
134  ATH_MSG_DEBUG("Default next event range filename extension: " << rangeID);
135  }
136  if( slot >= m_rangeIDinSlot.size() ) {
137  // MN - late resize, is there a better place for it?
138  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
139  }
140  // from now on new events will use the new rangeID
141  m_currentRangeID = rangeID;
142  // for ESMT these incidents are asynchronous, so wait for BeginProcessing to update the range map
143  if( not inConcurrentEventsMode() or has_context ) {
144  m_rangeIDinSlot[ slot ] = rangeID;
145  }
146  }
147  if( not inConcurrentEventsMode() and not fileInc ) {
148  // non-file incident case (filename=="") in regular SP LoopMgr
149  ATH_MSG_DEBUG("MetaData transition");
150  bool disconnect { false };
151  // MN: may not know the full filename yet, but that is only needed for disconnect==true
152  if( !m_metaDataSvc->transitionMetaDataFile( "" /*m_lastFileName*/, disconnect ).isSuccess() ) {
153  throw GaudiException("Cannot transition MetaData", name(), StatusCode::FAILURE);
154  }
155  }
156  }
157  else if( inc.type() == IncidentType::BeginProcessing ) {
158  // new event start - assing current rangeId to its slot
159  ATH_MSG_DEBUG("Assigning rangeID = " << m_currentRangeID << " to slot " << slot);
160  std::lock_guard lockg( m_mutex );
161  // If this service is enabled but not getting NextRange incidents, need to resize here
162  if( slot >= m_rangeIDinSlot.size() ) {
163  m_rangeIDinSlot.resize( std::max(slot+1, Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents()) );
164  }
166  }
167 }

◆ incidentName()

std::string OutputStreamSequencerSvc::incidentName ( ) const
inline

The name of the incident that starts a new event sequence.

Definition at line 65 of file OutputStreamSequencerSvc.h.

65 { return m_incidentName.value(); }

◆ inConcurrentEventsMode()

bool OutputStreamSequencerSvc::inConcurrentEventsMode ( )
static

Are there concurrent events? (threads>1)

Definition at line 84 of file OutputStreamSequencerSvc.cxx.

84  {
85  return Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() > 1;
86 }

◆ initialize()

StatusCode OutputStreamSequencerSvc::initialize ( )
finaloverridevirtual

Required of all Gaudi services:

Definition at line 30 of file OutputStreamSequencerSvc.cxx.

30  {
31  ATH_MSG_DEBUG("Initializing " << name());
32 
33  // Set to be listener for end of event
34  ServiceHandle<IIncidentSvc> incsvc("IncidentSvc", this->name());
35  if (!incsvc.retrieve().isSuccess()) {
36  ATH_MSG_FATAL("Cannot get IncidentSvc.");
37  return(StatusCode::FAILURE);
38  }
39  if( !incidentName().empty() ) {
40  incsvc->addListener(this, incidentName(), 100);
41  incsvc->addListener(this, IncidentType::BeginProcessing, 100);
42  ATH_MSG_DEBUG("Listening to " << incidentName() << " incidents" );
43  ATH_MSG_DEBUG("Reporting is " << (m_reportingOn.value()? "ON" : "OFF") );
44  // Retrieve MetaDataSvc
45  if( !m_metaDataSvc.isValid() and !m_metaDataSvc.retrieve().isSuccess() ) {
46  ATH_MSG_ERROR("Cannot get MetaDataSvc");
47  return StatusCode::FAILURE;
48  }
49  }
50 
51  if( inConcurrentEventsMode() ) {
52  ATH_MSG_DEBUG("Concurrent events mode");
53  } else {
54  ATH_MSG_VERBOSE("Sequential events mode");
55  }
56 
57  // Gaudi::Concurrency::ConcurrencyFlags::numConcurrentEvents() not set yet
58  // m_rangeIDinSlot.resize( );
60 
61  return(StatusCode::SUCCESS);
62 }

◆ interfaceID()

static const InterfaceID& OutputStreamSequencerSvc::interfaceID ( )
inlinestatic

Retrieve interface ID.

Definition at line 46 of file OutputStreamSequencerSvc.h.

46 { return IID_OutputStreamSequencerSvc; }

◆ inUse()

bool OutputStreamSequencerSvc::inUse ( ) const

Is the service in active use? (true after the first range incident is handled)

Definition at line 89 of file OutputStreamSequencerSvc.cxx.

89  {
90  return m_fileSequenceNumber >= 0;
91 }

◆ lastIncident()

const std::string& OutputStreamSequencerSvc::lastIncident ( )
inline

Last incident type that was handled.

Definition at line 80 of file OutputStreamSequencerSvc.h.

80 { return m_lastIncident; }

◆ msg() [1/2]

MsgStream& AthCommonMsg< Service >::msg ( ) const
inlineinherited

Definition at line 24 of file AthCommonMsg.h.

24  {
25  return this->msgStream();
26  }

◆ msg() [2/2]

MsgStream& AthCommonMsg< Service >::msg ( const MSG::Level  lvl) const
inlineinherited

Definition at line 27 of file AthCommonMsg.h.

27  {
28  return this->msgStream(lvl);
29  }

◆ msgLvl()

bool AthCommonMsg< Service >::msgLvl ( const MSG::Level  lvl) const
inlineinherited

Definition at line 30 of file AthCommonMsg.h.

30  {
31  return this->msgLevel(lvl);
32  }

◆ publishRangeReport()

void OutputStreamSequencerSvc::publishRangeReport ( const std::string &  outputFile)

Definition at line 222 of file OutputStreamSequencerSvc.cxx.

223 {
224  std::lock_guard lockg( m_mutex );
226 }

◆ queryInterface()

StatusCode OutputStreamSequencerSvc::queryInterface ( const InterfaceID &  riid,
void **  ppvInterface 
)
finaloverridevirtual

Required of all Gaudi services: see Gaudi documentation for details.

Definition at line 72 of file OutputStreamSequencerSvc.cxx.

72  {
73  if (riid == this->interfaceID()) {
74  *ppvInterface = this;
75  } else {
76  // Interface is not directly available: try out a base class
77  return(::AthService::queryInterface(riid, ppvInterface));
78  }
79  addRef();
80  return(StatusCode::SUCCESS);
81 }

◆ setRangeID()

std::string OutputStreamSequencerSvc::setRangeID ( const std::string &  rangeID)

set the RangeID (possibly temporarily) so the right Range Filename may be generated

Definition at line 208 of file OutputStreamSequencerSvc.cxx.

209 {
210  auto slot = Gaudi::Hive::currentContext().slot();
211  if( slot == EventContext::INVALID_CONTEXT_ID ) slot = 0;
212  std::lock_guard lockg( m_mutex );
213  if( slot >= m_rangeIDinSlot.size() ) {
214  throw std::runtime_error("OutputStreamSequencer::setRangeID(): slot out of range");
215  }
216  std::string oldrange = m_rangeIDinSlot[ slot ];
217  m_rangeIDinSlot[ slot ] = rangeID;
218  return oldrange;
219 }

Member Data Documentation

◆ m_currentRangeID

std::string OutputStreamSequencerSvc::m_currentRangeID
private

Current EventRange ID constructed on the last NextRange incident.

Definition at line 89 of file OutputStreamSequencerSvc.h.

◆ m_fileSequenceNumber

int OutputStreamSequencerSvc::m_fileSequenceNumber
private

The event sequence number.

Definition at line 86 of file OutputStreamSequencerSvc.h.

◆ m_finishedRange

std::map<std::string,std::string>::iterator OutputStreamSequencerSvc::m_finishedRange
private

Definition at line 109 of file OutputStreamSequencerSvc.h.

◆ m_fnToRangeId

std::map<std::string,std::string> OutputStreamSequencerSvc::m_fnToRangeId
private

Definition at line 108 of file OutputStreamSequencerSvc.h.

◆ m_incidentName

StringProperty OutputStreamSequencerSvc::m_incidentName
private
Initial value:
{this, "SequenceIncidentName", "",
"Name of the incident that signals the next Event Range start" }

SequenceIncidentName, incident name for triggering file sequencing.

Definition at line 102 of file OutputStreamSequencerSvc.h.

◆ m_lastFileName

std::string OutputStreamSequencerSvc::m_lastFileName
private

Recently constructed full file name (useful in single threaded processing)

Definition at line 92 of file OutputStreamSequencerSvc.h.

◆ m_lastIncident

std::string OutputStreamSequencerSvc::m_lastIncident
private

Last incident type that was handled.

Definition at line 95 of file OutputStreamSequencerSvc.h.

◆ m_metaDataSvc

ServiceHandle<MetaDataSvc> OutputStreamSequencerSvc::m_metaDataSvc
private

Definition at line 83 of file OutputStreamSequencerSvc.h.

◆ m_mutex

std::mutex OutputStreamSequencerSvc::m_mutex
mutableprivate

Definition at line 111 of file OutputStreamSequencerSvc.h.

◆ m_rangeIDinSlot

std::vector<std::string> OutputStreamSequencerSvc::m_rangeIDinSlot
private

EventRange ID for all slots.

Definition at line 98 of file OutputStreamSequencerSvc.h.

◆ m_reportingOn

BooleanProperty OutputStreamSequencerSvc::m_reportingOn
private
Initial value:
{this, "ReportingOn", false,
"If True, keep info about Ranges for getRangeReport() calls"}

Flag to switch on storage of reporting info in fnToRangeId.

Definition at line 105 of file OutputStreamSequencerSvc.h.


The documentation for this class was generated from the following files:
AthService::AthService
AthService()
OutputStreamSequencerSvc::incidentName
std::string incidentName() const
The name of the incident that starts a new event sequence.
Definition: OutputStreamSequencerSvc.h:65
OutputStreamSequencerSvc::inConcurrentEventsMode
static bool inConcurrentEventsMode()
Are there concurrent events? (threads>1)
Definition: OutputStreamSequencerSvc.cxx:84
OutputStreamSequencerSvc::m_lastFileName
std::string m_lastFileName
Recently constructed full file name (useful in single threaded processing)
Definition: OutputStreamSequencerSvc.h:92
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
max
#define max(a, b)
Definition: cfImp.cxx:41
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
OutputStreamSequencerSvc::m_fnToRangeId
std::map< std::string, std::string > m_fnToRangeId
Definition: OutputStreamSequencerSvc.h:108
checkTP.report
report
Definition: checkTP.py:127
OutputStreamSequencerSvc::inUse
bool inUse() const
Is the service in active use? (true after the first range incident is handled)
Definition: OutputStreamSequencerSvc.cxx:89
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
empty
bool empty(TH1 *h)
Definition: computils.cxx:294
OutputStreamSequencerSvc::m_mutex
std::mutex m_mutex
Definition: OutputStreamSequencerSvc.h:111
OutputStreamSequencerSvc::RangeReport_ptr
std::unique_ptr< RangeReport_t > RangeReport_ptr
Definition: OutputStreamSequencerSvc.h:37
compareGeometries.outputFile
string outputFile
Definition: compareGeometries.py:25
OutputStreamSequencerSvc::m_lastIncident
std::string m_lastIncident
Last incident type that was handled.
Definition: OutputStreamSequencerSvc.h:95
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
beamspotman.n
n
Definition: beamspotman.py:731
OutputStreamSequencerSvc::m_reportingOn
BooleanProperty m_reportingOn
Flag to switch on storage of reporting info in fnToRangeId.
Definition: OutputStreamSequencerSvc.h:105
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
OutputStreamSequencerSvc::m_metaDataSvc
ServiceHandle< MetaDataSvc > m_metaDataSvc
Definition: OutputStreamSequencerSvc.h:83
OutputStreamSequencerSvc::m_rangeIDinSlot
std::vector< std::string > m_rangeIDinSlot
EventRange ID for all slots.
Definition: OutputStreamSequencerSvc.h:98
OutputStreamSequencerSvc::currentRangeID
std::string currentRangeID() const
The current Event Range ID (only one range is returned)
Definition: OutputStreamSequencerSvc.cxx:197
OutputStreamSequencerSvc::m_currentRangeID
std::string m_currentRangeID
Current EventRange ID constructed on the last NextRange incident.
Definition: OutputStreamSequencerSvc.h:89
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:195
OutputStreamSequencerSvc::m_fileSequenceNumber
int m_fileSequenceNumber
The event sequence number.
Definition: OutputStreamSequencerSvc.h:86
OutputStreamSequencerSvc::m_finishedRange
std::map< std::string, std::string >::iterator m_finishedRange
Definition: OutputStreamSequencerSvc.h:109
OutputStreamSequencerSvc::interfaceID
static const InterfaceID & interfaceID()
Retrieve interface ID.
Definition: OutputStreamSequencerSvc.h:46
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
OutputStreamSequencerSvc::m_incidentName
StringProperty m_incidentName
SequenceIncidentName, incident name for triggering file sequencing.
Definition: OutputStreamSequencerSvc.h:102
ServiceHandle< IIncidentSvc >