ATLAS Offline Software
Public Member Functions | Private Attributes | Friends | List of all members
AthenaRootSharedWriterSvc Class Reference

This class provides an example for writing event data objects to Pool. More...

#include <AthenaRootSharedWriterSvc.h>

Inheritance diagram for AthenaRootSharedWriterSvc:
Collaboration diagram for AthenaRootSharedWriterSvc:

Public Member Functions

 AthenaRootSharedWriterSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor. More...
 
virtual ~AthenaRootSharedWriterSvc ()=default
 Destructor. More...
 
virtual StatusCode initialize () override
 Gaudi Service Interface method implementations: More...
 
virtual StatusCode stop () override
 
virtual StatusCode finalize () override
 
virtual StatusCode share (int numClients=0, bool motherClient=false) override
 

Private Attributes

ServiceHandle< IAthenaPoolCnvSvcm_cnvSvc {this,"AthenaPoolCnvSvc","AthenaPoolCnvSvc"}
 
TServerSocket * m_rootServerSocket
 
TMonitor * m_rootMonitor
 
THashTable m_rootMergers
 
int m_rootClientIndex
 
int m_rootClientCount
 
int m_numberOfStreams
 

Friends

class SvcFactory< AthenaRootSharedWriterSvc >
 

Detailed Description

This class provides an example for writing event data objects to Pool.

Definition at line 25 of file AthenaRootSharedWriterSvc.h.

Constructor & Destructor Documentation

◆ AthenaRootSharedWriterSvc()

AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Standard Service Constructor.

Definition at line 158 of file AthenaRootSharedWriterSvc.cxx.

159  : base_class(name, pSvcLocator)
161 }

◆ ~AthenaRootSharedWriterSvc()

virtual AthenaRootSharedWriterSvc::~AthenaRootSharedWriterSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ finalize()

StatusCode AthenaRootSharedWriterSvc::finalize ( )
overridevirtual

Definition at line 325 of file AthenaRootSharedWriterSvc.cxx.

325  {
326  ATH_MSG_INFO("in finalize()");
327  delete m_rootMonitor; m_rootMonitor = nullptr;
328  delete m_rootServerSocket; m_rootServerSocket = nullptr;
329  return StatusCode::SUCCESS;
330 }

◆ initialize()

StatusCode AthenaRootSharedWriterSvc::initialize ( )
overridevirtual

Gaudi Service Interface method implementations:

Definition at line 163 of file AthenaRootSharedWriterSvc.cxx.

163  {
164  ATH_MSG_INFO("in initialize()");
165 
166  // Initialize IConversionSvc
167  ATH_CHECK(m_cnvSvc.retrieve());
168  IProperty* propertyServer = dynamic_cast<IProperty*>(m_cnvSvc.get());
169  if (propertyServer == nullptr) {
170  ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
171  return StatusCode::FAILURE;
172  } else {
173  std::string propertyName = "ParallelCompression";
174  bool parallelCompression(false);
175  BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
176  if (propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
177  ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
178  } else if (parallelCompressionProp.value()) {
179  int streamPort = 0;
180  propertyName = "StreamPortString";
181  std::string streamPortString("");
182  StringProperty streamPortStringProp(propertyName, streamPortString);
183  if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
184  ATH_MSG_INFO("Conversion service does not have StreamPortString property, using default: " << streamPort);
185  } else {
186  streamPort = atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(':') + 1).c_str());
187  }
188  m_rootServerSocket = new TServerSocket(streamPort, (streamPort == 0 ? false : true), 100);
189  if (m_rootServerSocket == nullptr || !m_rootServerSocket->IsValid()) {
190  ATH_MSG_FATAL("Could not create ROOT TServerSocket: " << streamPort);
191  return StatusCode::FAILURE;
192  }
193  streamPort = m_rootServerSocket->GetLocalPort();
194  const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(':')+1) + std::to_string(streamPort)};
195  if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
196  ATH_MSG_FATAL("Could not set Conversion Service property " << propertyName << " from " << streamPortString << " to " << newStreamPortString);
197  return StatusCode::FAILURE;
198  }
199  m_rootMonitor = new TMonitor;
201  ATH_MSG_DEBUG("Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
202  }
203  }
204  // Count the number of output streams
205  const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
206  for (const auto& alg : algMgr->getAlgorithms()) {
207  if (alg->type() == "AthenaOutputStream") {
208  ATH_MSG_DEBUG("Counting " << alg->name() << " as an output stream algorithm");
210  }
211  }
212  if (m_numberOfStreams == 0) {
213  ATH_MSG_WARNING("No output stream algorithm found, setting the number of streams to 1");
214  m_numberOfStreams = 1;
215  } else {
216  ATH_MSG_INFO("Found a total of " << m_numberOfStreams << " output streams");
217  }
218 
219  return StatusCode::SUCCESS;
220 }

◆ share()

StatusCode AthenaRootSharedWriterSvc::share ( int  numClients = 0,
bool  motherClient = false 
)
overridevirtual

Definition at line 222 of file AthenaRootSharedWriterSvc.cxx.

222  {
223  ATH_MSG_DEBUG("Start commitOutput loop");
224  StatusCode sc = m_cnvSvc->commitOutput("", false);
225 
226  // Allow ROOT clients to start up (by setting active clients)
227  // and wait to stop the ROOT server until all clients are done and metadata is written (commitOutput fail).
228  bool anyActiveClients = (m_rootServerSocket != nullptr);
229  while (sc.isSuccess() || sc.isRecoverable() || anyActiveClients) {
230  if (sc.isSuccess()) {
231  ATH_MSG_VERBOSE("Success in commitOutput loop");
232  } else if (m_rootMonitor != nullptr) {
233  TSocket* socket = m_rootMonitor->Select(1);
234  if (socket != nullptr && socket != (TSocket*)-1) {
235  ATH_MSG_DEBUG("ROOT Monitor got: " << socket);
236  if (socket->IsA() == TServerSocket::Class()) {
237  TSocket* client = ((TServerSocket*)socket)->Accept();
238  client->Send(m_rootClientIndex, 0);
239  client->Send(1, 1);
242  if (m_rootClientCount < (numClients-1)*m_numberOfStreams + 1) {
243  m_rootMonitor->Add(client);
244  ATH_MSG_INFO("ROOT Monitor add client: " << m_rootClientIndex << ", " << client);
245  } else {
246  ATH_MSG_WARNING("ROOT Monitor do NOT add client: " << m_rootClientIndex << ", " << client);
247  client->Close("force");
249  }
250  } else {
251  TMessage* message = nullptr;
252  Int_t result = socket->Recv(message);
253  if (result < 0) {
254  ATH_MSG_ERROR("ROOT Monitor got an error while receiving the message from the socket: " << result);
255  return StatusCode::FAILURE;
256  }
257  if (message == nullptr) {
258  ATH_MSG_WARNING("ROOT Monitor got no message from socket: " << socket);
259  } else if (message->What() == kMESS_STRING) {
260  char str[64];
261  message->ReadString(str, 64);
262  ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << str);
263  m_rootMonitor->Remove(socket);
264  ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << socket->GetBytesRecv() << ", " << socket->GetBytesSent());
265  socket->Close();
267  if (m_rootMonitor->GetActive() == 0 || m_rootClientCount == 0) {
268  if (!motherClient) {
269  anyActiveClients = false;
270  ATH_MSG_INFO("ROOT Monitor: No more active clients...");
271  } else {
272  motherClient = false;
273  ATH_MSG_INFO("ROOT Monitor: Mother process is done...");
274  if (!m_cnvSvc->commitCatalog().isSuccess()) {
275  ATH_MSG_FATAL("Failed to commit file catalog.");
276  return StatusCode::FAILURE;
277  }
278  }
279  }
280  } else if (message->What() == kMESS_ANY) {
281  long long length;
282  TString filename;
283  int clientId;
284  message->ReadInt(clientId);
285  message->ReadTString(filename);
286  message->ReadLong64(length);
287  ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length);
288  std::unique_ptr<TMemFile> transient(new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE"));
289  message->SetBufferOffset(message->Length() + length);
290  ParallelFileMerger* info = static_cast<ParallelFileMerger*>(m_rootMergers.FindObject(filename));
291  if (!info) {
292  info = new ParallelFileMerger(filename, transient->GetCompressionSettings());
293  m_rootMergers.Add(info);
294  ATH_MSG_INFO("ROOT Monitor ParallelFileMerger: " << info << ", for: " << filename);
295  }
296  info->MergeTrees(transient.get());
297  }
298  delete message; message = nullptr;
299  }
300  }
301  } else if (m_rootMonitor == nullptr) {
302  usleep(100);
303  }
304  // Once commitOutput failed all legacy clients are finished (writing metadata), do not call again.
305  if (sc.isSuccess() || sc.isRecoverable()) {
306  sc = m_cnvSvc->commitOutput("", false);
307  if (sc.isFailure() && !sc.isRecoverable()) {
308  ATH_MSG_INFO("commitOutput failed, metadata done.");
309  if (anyActiveClients && m_rootClientCount == 0) {
310  ATH_MSG_INFO("ROOT Monitor: No clients, terminating the loop...");
311  anyActiveClients = false;
312  }
313  }
314  }
315  }
316  ATH_MSG_INFO("End commitOutput loop");
317  return StatusCode::SUCCESS;
318 }

◆ stop()

StatusCode AthenaRootSharedWriterSvc::stop ( )
overridevirtual

Definition at line 320 of file AthenaRootSharedWriterSvc.cxx.

320  {
321  m_rootMergers.Delete();
322  return StatusCode::SUCCESS;
323 }

Friends And Related Function Documentation

◆ SvcFactory< AthenaRootSharedWriterSvc >

friend class SvcFactory< AthenaRootSharedWriterSvc >
friend

Definition at line 1 of file AthenaRootSharedWriterSvc.h.

Member Data Documentation

◆ m_cnvSvc

ServiceHandle<IAthenaPoolCnvSvc> AthenaRootSharedWriterSvc::m_cnvSvc {this,"AthenaPoolCnvSvc","AthenaPoolCnvSvc"}
private

Definition at line 44 of file AthenaRootSharedWriterSvc.h.

◆ m_numberOfStreams

int AthenaRootSharedWriterSvc::m_numberOfStreams
private

Definition at line 51 of file AthenaRootSharedWriterSvc.h.

◆ m_rootClientCount

int AthenaRootSharedWriterSvc::m_rootClientCount
private

Definition at line 50 of file AthenaRootSharedWriterSvc.h.

◆ m_rootClientIndex

int AthenaRootSharedWriterSvc::m_rootClientIndex
private

Definition at line 49 of file AthenaRootSharedWriterSvc.h.

◆ m_rootMergers

THashTable AthenaRootSharedWriterSvc::m_rootMergers
private

Definition at line 48 of file AthenaRootSharedWriterSvc.h.

◆ m_rootMonitor

TMonitor* AthenaRootSharedWriterSvc::m_rootMonitor
private

Definition at line 47 of file AthenaRootSharedWriterSvc.h.

◆ m_rootServerSocket

TServerSocket* AthenaRootSharedWriterSvc::m_rootServerSocket
private

Definition at line 46 of file AthenaRootSharedWriterSvc.h.


The documentation for this class was generated from the following files:
grepfile.info
info
Definition: grepfile.py:38
AthenaRootSharedWriterSvc::m_rootClientCount
int m_rootClientCount
Definition: AthenaRootSharedWriterSvc.h:50
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
SGout2dot.alg
alg
Definition: SGout2dot.py:243
get_generator_info.result
result
Definition: get_generator_info.py:21
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
ReweightUtils.message
message
Definition: ReweightUtils.py:15
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
rerun_display.client
client
Definition: rerun_display.py:31
AthenaRootSharedWriterSvc::m_rootClientIndex
int m_rootClientIndex
Definition: AthenaRootSharedWriterSvc.h:49
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaRootSharedWriterSvc::m_rootServerSocket
TServerSocket * m_rootServerSocket
Definition: AthenaRootSharedWriterSvc.h:46
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
ActsTrk::to_string
std::string to_string(const DetectorType &type)
Definition: GeometryDefs.h:34
AthenaRootSharedWriterSvc::m_rootMergers
THashTable m_rootMergers
Definition: AthenaRootSharedWriterSvc.h:48
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaRootSharedWriterSvc::m_rootMonitor
TMonitor * m_rootMonitor
Definition: AthenaRootSharedWriterSvc.h:47
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
ParallelFileMerger
Definition: AthenaRootSharedWriterSvc.cxx:56
str
Definition: BTagTrackIpAccessor.cxx:11
AthenaRootSharedWriterSvc::m_numberOfStreams
int m_numberOfStreams
Definition: AthenaRootSharedWriterSvc.h:51
AthenaRootSharedWriterSvc::m_cnvSvc
ServiceHandle< IAthenaPoolCnvSvc > m_cnvSvc
Definition: AthenaRootSharedWriterSvc.h:44
length
double length(const pvec &v)
Definition: FPGATrackSimLLPDoubletHoughTransformTool.cxx:26