ATLAS Offline Software
Public Member Functions | Private Attributes | Friends | List of all members
AthenaRootSharedWriterSvc Class Reference

This class provides an example for writing event data objects to Pool. More...

#include <AthenaRootSharedWriterSvc.h>

Inheritance diagram for AthenaRootSharedWriterSvc:
Collaboration diagram for AthenaRootSharedWriterSvc:

Public Member Functions

 AthenaRootSharedWriterSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor. More...
 
virtual ~AthenaRootSharedWriterSvc ()=default
 Destructor. More...
 
virtual StatusCode initialize () override
 Gaudi Service Interface method implementations: More...
 
virtual StatusCode stop () override
 
virtual StatusCode finalize () override
 
virtual StatusCode share (int numClients=0, bool motherClient=false) override
 

Private Attributes

ServiceHandle< IAthenaPoolCnvSvcm_cnvSvc {this,"AthenaPoolCnvSvc","AthenaPoolCnvSvc"}
 
TServerSocket * m_rootServerSocket
 
TMonitor * m_rootMonitor
 
THashTable m_rootMergers
 
int m_rootClientIndex
 
int m_rootClientCount
 
int m_numberOfStreams
 

Friends

class SvcFactory< AthenaRootSharedWriterSvc >
 

Detailed Description

This class provides an example for writing event data objects to Pool.

Definition at line 25 of file AthenaRootSharedWriterSvc.h.

Constructor & Destructor Documentation

◆ AthenaRootSharedWriterSvc()

AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc ( const std::string &  name,
ISvcLocator *  pSvcLocator 
)

Standard Service Constructor.

Definition at line 157 of file AthenaRootSharedWriterSvc.cxx.

158  : base_class(name, pSvcLocator)
160 }

◆ ~AthenaRootSharedWriterSvc()

virtual AthenaRootSharedWriterSvc::~AthenaRootSharedWriterSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ finalize()

StatusCode AthenaRootSharedWriterSvc::finalize ( )
overridevirtual

Definition at line 324 of file AthenaRootSharedWriterSvc.cxx.

324  {
325  ATH_MSG_INFO("in finalize()");
326  delete m_rootMonitor; m_rootMonitor = nullptr;
327  delete m_rootServerSocket; m_rootServerSocket = nullptr;
328  return StatusCode::SUCCESS;
329 }

◆ initialize()

StatusCode AthenaRootSharedWriterSvc::initialize ( )
overridevirtual

Gaudi Service Interface method implementations:

Definition at line 162 of file AthenaRootSharedWriterSvc.cxx.

162  {
163  ATH_MSG_INFO("in initialize()");
164 
165  // Initialize IConversionSvc
166  ATH_CHECK(m_cnvSvc.retrieve());
167  IProperty* propertyServer = dynamic_cast<IProperty*>(m_cnvSvc.get());
168  if (propertyServer == nullptr) {
169  ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
170  return StatusCode::FAILURE;
171  } else {
172  std::string propertyName = "ParallelCompression";
173  bool parallelCompression(false);
174  BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
175  if (propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
176  ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
177  } else if (parallelCompressionProp.value()) {
178  int streamPort = 0;
179  propertyName = "StreamPortString";
180  std::string streamPortString("");
181  StringProperty streamPortStringProp(propertyName, streamPortString);
182  if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
183  ATH_MSG_INFO("Conversion service does not have StreamPortString property, using default: " << streamPort);
184  } else {
185  streamPort = atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(':') + 1).c_str());
186  }
187  m_rootServerSocket = new TServerSocket(streamPort, (streamPort == 0 ? false : true), 100);
188  if (m_rootServerSocket == nullptr || !m_rootServerSocket->IsValid()) {
189  ATH_MSG_FATAL("Could not create ROOT TServerSocket: " << streamPort);
190  return StatusCode::FAILURE;
191  }
192  streamPort = m_rootServerSocket->GetLocalPort();
193  const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(':')+1) + std::to_string(streamPort)};
194  if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
195  ATH_MSG_FATAL("Could not set Conversion Service property " << propertyName << " from " << streamPortString << " to " << newStreamPortString);
196  return StatusCode::FAILURE;
197  }
198  m_rootMonitor = new TMonitor;
200  ATH_MSG_DEBUG("Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
201  }
202  }
203  // Count the number of output streams
204  const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
205  for (const auto& alg : algMgr->getAlgorithms()) {
206  if (alg->type() == "AthenaOutputStream") {
207  ATH_MSG_DEBUG("Counting " << alg->name() << " as an output stream algorithm");
209  }
210  }
211  if (m_numberOfStreams == 0) {
212  ATH_MSG_WARNING("No output stream algorithm found, setting the number of streams to 1");
213  m_numberOfStreams = 1;
214  } else {
215  ATH_MSG_INFO("Found a total of " << m_numberOfStreams << " output streams");
216  }
217 
218  return StatusCode::SUCCESS;
219 }

◆ share()

StatusCode AthenaRootSharedWriterSvc::share ( int  numClients = 0,
bool  motherClient = false 
)
overridevirtual

Definition at line 221 of file AthenaRootSharedWriterSvc.cxx.

221  {
222  ATH_MSG_DEBUG("Start commitOutput loop");
223  StatusCode sc = m_cnvSvc->commitOutput("", false);
224 
225  // Allow ROOT clients to start up (by setting active clients)
226  // and wait to stop the ROOT server until all clients are done and metadata is written (commitOutput fail).
227  bool anyActiveClients = (m_rootServerSocket != nullptr);
228  while (sc.isSuccess() || sc.isRecoverable() || anyActiveClients) {
229  if (sc.isSuccess()) {
230  ATH_MSG_VERBOSE("Success in commitOutput loop");
231  } else if (m_rootMonitor != nullptr) {
232  TSocket* socket = m_rootMonitor->Select(1);
233  if (socket != nullptr && socket != (TSocket*)-1) {
234  ATH_MSG_DEBUG("ROOT Monitor got: " << socket);
235  if (socket->IsA() == TServerSocket::Class()) {
236  TSocket* client = ((TServerSocket*)socket)->Accept();
237  client->Send(m_rootClientIndex, 0);
238  client->Send(1, 1);
241  if (m_rootClientCount < (numClients-1)*m_numberOfStreams + 1) {
242  m_rootMonitor->Add(client);
243  ATH_MSG_INFO("ROOT Monitor add client: " << m_rootClientIndex << ", " << client);
244  } else {
245  ATH_MSG_WARNING("ROOT Monitor do NOT add client: " << m_rootClientIndex << ", " << client);
246  client->Close("force");
248  }
249  } else {
250  TMessage* message = nullptr;
251  Int_t result = socket->Recv(message);
252  if (result < 0) {
253  ATH_MSG_ERROR("ROOT Monitor got an error while receiving the message from the socket: " << result);
254  return StatusCode::FAILURE;
255  }
256  if (message == nullptr) {
257  ATH_MSG_WARNING("ROOT Monitor got no message from socket: " << socket);
258  } else if (message->What() == kMESS_STRING) {
259  char str[64];
260  message->ReadString(str, 64);
261  ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << str);
262  m_rootMonitor->Remove(socket);
263  ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << socket->GetBytesRecv() << ", " << socket->GetBytesSent());
264  socket->Close();
266  if (m_rootMonitor->GetActive() == 0 || m_rootClientCount == 0) {
267  if (!motherClient) {
268  anyActiveClients = false;
269  ATH_MSG_INFO("ROOT Monitor: No more active clients...");
270  } else {
271  motherClient = false;
272  ATH_MSG_INFO("ROOT Monitor: Mother process is done...");
273  if (!m_cnvSvc->commitCatalog().isSuccess()) {
274  ATH_MSG_FATAL("Failed to commit file catalog.");
275  return StatusCode::FAILURE;
276  }
277  }
278  }
279  } else if (message->What() == kMESS_ANY) {
280  long long length;
281  TString filename;
282  int clientId;
283  message->ReadInt(clientId);
284  message->ReadTString(filename);
285  message->ReadLong64(length);
286  ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length);
287  std::unique_ptr<TMemFile> transient(new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE"));
288  message->SetBufferOffset(message->Length() + length);
289  ParallelFileMerger* info = static_cast<ParallelFileMerger*>(m_rootMergers.FindObject(filename));
290  if (!info) {
291  info = new ParallelFileMerger(filename, transient->GetCompressionSettings());
292  m_rootMergers.Add(info);
293  ATH_MSG_INFO("ROOT Monitor ParallelFileMerger: " << info << ", for: " << filename);
294  }
295  info->MergeTrees(transient.get());
296  }
297  delete message; message = nullptr;
298  }
299  }
300  } else if (m_rootMonitor == nullptr) {
301  usleep(100);
302  }
303  // Once commitOutput failed all legacy clients are finished (writing metadata), do not call again.
304  if (sc.isSuccess() || sc.isRecoverable()) {
305  sc = m_cnvSvc->commitOutput("", false);
306  if (sc.isFailure() && !sc.isRecoverable()) {
307  ATH_MSG_INFO("commitOutput failed, metadata done.");
308  if (anyActiveClients && m_rootClientCount == 0) {
309  ATH_MSG_INFO("ROOT Monitor: No clients, terminating the loop...");
310  anyActiveClients = false;
311  }
312  }
313  }
314  }
315  ATH_MSG_INFO("End commitOutput loop");
316  return StatusCode::SUCCESS;
317 }

◆ stop()

StatusCode AthenaRootSharedWriterSvc::stop ( )
overridevirtual

Definition at line 319 of file AthenaRootSharedWriterSvc.cxx.

319  {
320  m_rootMergers.Delete();
321  return StatusCode::SUCCESS;
322 }

Friends And Related Function Documentation

◆ SvcFactory< AthenaRootSharedWriterSvc >

friend class SvcFactory< AthenaRootSharedWriterSvc >
friend

Definition at line 1 of file AthenaRootSharedWriterSvc.h.

Member Data Documentation

◆ m_cnvSvc

ServiceHandle<IAthenaPoolCnvSvc> AthenaRootSharedWriterSvc::m_cnvSvc {this,"AthenaPoolCnvSvc","AthenaPoolCnvSvc"}
private

Definition at line 44 of file AthenaRootSharedWriterSvc.h.

◆ m_numberOfStreams

int AthenaRootSharedWriterSvc::m_numberOfStreams
private

Definition at line 51 of file AthenaRootSharedWriterSvc.h.

◆ m_rootClientCount

int AthenaRootSharedWriterSvc::m_rootClientCount
private

Definition at line 50 of file AthenaRootSharedWriterSvc.h.

◆ m_rootClientIndex

int AthenaRootSharedWriterSvc::m_rootClientIndex
private

Definition at line 49 of file AthenaRootSharedWriterSvc.h.

◆ m_rootMergers

THashTable AthenaRootSharedWriterSvc::m_rootMergers
private

Definition at line 48 of file AthenaRootSharedWriterSvc.h.

◆ m_rootMonitor

TMonitor* AthenaRootSharedWriterSvc::m_rootMonitor
private

Definition at line 47 of file AthenaRootSharedWriterSvc.h.

◆ m_rootServerSocket

TServerSocket* AthenaRootSharedWriterSvc::m_rootServerSocket
private

Definition at line 46 of file AthenaRootSharedWriterSvc.h.


The documentation for this class was generated from the following files:
grepfile.info
info
Definition: grepfile.py:38
AthenaRootSharedWriterSvc::m_rootClientCount
int m_rootClientCount
Definition: AthenaRootSharedWriterSvc.h:50
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
SGout2dot.alg
alg
Definition: SGout2dot.py:243
get_generator_info.result
result
Definition: get_generator_info.py:21
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
ReweightUtils.message
message
Definition: ReweightUtils.py:15
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
rerun_display.client
client
Definition: rerun_display.py:31
AthenaRootSharedWriterSvc::m_rootClientIndex
int m_rootClientIndex
Definition: AthenaRootSharedWriterSvc.h:49
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaRootSharedWriterSvc::m_rootServerSocket
TServerSocket * m_rootServerSocket
Definition: AthenaRootSharedWriterSvc.h:46
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
ActsTrk::to_string
std::string to_string(const DetectorType &type)
Definition: GeometryDefs.h:34
AthenaRootSharedWriterSvc::m_rootMergers
THashTable m_rootMergers
Definition: AthenaRootSharedWriterSvc.h:48
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
AthenaRootSharedWriterSvc::m_rootMonitor
TMonitor * m_rootMonitor
Definition: AthenaRootSharedWriterSvc.h:47
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
ParallelFileMerger
Definition: AthenaRootSharedWriterSvc.cxx:56
str
Definition: BTagTrackIpAccessor.cxx:11
AthenaRootSharedWriterSvc::m_numberOfStreams
int m_numberOfStreams
Definition: AthenaRootSharedWriterSvc.h:51
AthenaRootSharedWriterSvc::m_cnvSvc
ServiceHandle< IAthenaPoolCnvSvc > m_cnvSvc
Definition: AthenaRootSharedWriterSvc.h:44
length
double length(const pvec &v)
Definition: FPGATrackSimLLPDoubletHoughTransformTool.cxx:26