ATLAS Offline Software
AthenaRootSharedWriterSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
10 #include "GaudiKernel/IAlgManager.h"
12 
13 #include "TBranch.h"
14 #include "TClass.h"
15 #include "TFile.h"
16 #include "TFileMerger.h"
17 #include "TKey.h"
18 #include "TLeaf.h"
19 #include "TMemFile.h"
20 #include "TMessage.h"
21 #include "TMonitor.h"
22 #include "TServerSocket.h"
23 #include "TSocket.h"
24 #include "TString.h"
25 #include "TTree.h"
26 
27 #include <set>
28 #include <map>
29 
31 struct BranchDesc {
32 public:
33  TClass* clazz;
34  using dummy_ptr_t = std::unique_ptr<void, std::function<void(void*)> >;
35  std::unique_ptr<void, std::function<void(void*)> > dummyptr;
36  void* dummy = 0;
37 
38  BranchDesc(TClass* cl) : clazz(cl) {}
39 
40  void* dummyAddr()
41  {
42  if (clazz) {
43  void(TClass::*dxtor)(void*, Bool_t) = &TClass::Destructor;
44  std::function<void(void*)> del = std::bind(dxtor, clazz, std::placeholders::_1, false);
45  dummyptr = std::unique_ptr<void, std::function<void(void*)> >(clazz->New(), std::move(del));
46  dummy = dummyptr.get();
47  return &dummy;
48  }
49  return nullptr;
50  }
51 };
52 
53 /* Code from ROOT tutorials/net/parallelMergeServer.C, reduced to handle TTrees only */
54 
55 struct ParallelFileMerger : public TObject
56 {
57  TString fFilename;
58  TFileMerger fMerger;
59 
60  ParallelFileMerger(const char *filename, int compress = ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault) : fFilename(filename), fMerger(kFALSE, kTRUE)
61  {
62  fMerger.OutputFile(filename, "RECREATE", compress);
63  }
64 
66  {
67  }
68 
69  ULong_t Hash() const
70  {
71  return fFilename.Hash();
72  }
73 
74  const char* GetName() const
75  {
76  return fFilename;
77  }
78 
79 // Add missing branches to client tree and BackFill before merging
80  bool syncBranches(TTree* fromTree, TTree* toTree)
81  {
82  bool updated = false;
83  const TObjArray* fromBranches = fromTree->GetListOfBranches();
84  const TObjArray* toBranches = toTree->GetListOfBranches();
85  int nBranches = fromBranches->GetEntriesFast();
86  for (int k = 0; k < nBranches; ++k) {
87  TBranch* branch = static_cast<TBranch*>(fromBranches->UncheckedAt(k));
88  if (toBranches->FindObject(branch->GetName()) == nullptr) {
89  TBranch* newBranch = nullptr;
90  TClass* cl = TClass::GetClass(branch->GetClassName());
92  void* empty = desc.dummyAddr();
93  char buff[32];
94  if (strlen(branch->GetClassName()) > 0) {
95  newBranch = toTree->Branch(branch->GetName(), branch->GetClassName(), nullptr, branch->GetBasketSize(), branch->GetSplitLevel());
96  newBranch->SetAddress(empty);
97  } else {
98  TObjArray* outLeaves = branch->GetListOfLeaves();
99  TLeaf* leaf = static_cast<TLeaf*>(outLeaves->UncheckedAt(0));
100  std::string type = leaf->GetTypeName();
101  std::string attr = leaf->GetName();
102  if (type == "Int_t") type = attr + "/I";
103  else if (type == "Short_t") type = attr + "/S";
104  else if (type == "Long_t") type = attr + "/L";
105  else if (type == "UInt_t") type = attr + "/i";
106  else if (type == "UShort_t") type = attr + "/s";
107  else if (type == "UShort_t") type = attr + "/s";
108  else if (type == "Float_t") type = attr + "/F";
109  else if (type == "Double_t") type = attr + "/D";
110  else if (type == "Char_t") type = attr + "/B";
111  else if (type == "UChar_t") type = attr + "/b";
112  else if (type == "Bool_t") type = attr + "/O";
113  newBranch = toTree->Branch(branch->GetName(), buff, type.c_str(), 2048);
114  }
115  int nEntries = toTree->GetEntries();
116  for (int m = 0; m < nEntries; ++m) {
117  newBranch->BackFill();
118  }
119  updated = true;
120  }
121  }
122  return updated;
123  }
124 
125  Bool_t MergeTrees(TFile *input)
126  {
127  fMerger.AddFile(input);
128  TIter nextKey(input->GetListOfKeys());
129  while (TKey* key = static_cast<TKey*>(nextKey())) {
130  TClass* cl = TClass::GetClass(key->GetClassName());
131  if (cl != nullptr && cl->InheritsFrom("TTree")) {
132  TTree* outCollTree = static_cast<TTree*>(fMerger.GetOutputFile()->Get(key->GetName()));
133  TTree* inCollTree = static_cast<TTree*>(input->Get(key->GetName()));
134  if (inCollTree != nullptr && outCollTree != nullptr) {
135  if (syncBranches(outCollTree, inCollTree)) {
136  input->Write();
137  }
138  syncBranches(inCollTree, outCollTree);
139  }
140  }
141  }
142 
143  Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable | TFileMerger::kKeepCompression);
144  nextKey = input->GetListOfKeys();
145  while (TKey* key = static_cast<TKey*>(nextKey())) {
146  TClass* cl = TClass::GetClass(key->GetClassName());
147  if (cl != nullptr && 0 != cl->GetResetAfterMerge()) {
148  key->Delete();
149  input->GetListOfKeys()->Remove(key);
150  delete key;
151  }
152  }
153  return result;
154  }
155 };
156 
157 //___________________________________________________________________________
158 AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc(const std::string& name, ISvcLocator* pSvcLocator)
159  : base_class(name, pSvcLocator)
160  , m_rootServerSocket(nullptr), m_rootMonitor(nullptr), m_rootMergers(), m_rootClientIndex(0), m_rootClientCount(0), m_numberOfStreams(0) {
161 }
162 //___________________________________________________________________________
164  ATH_MSG_INFO("in initialize()");
165 
166  // Initialize IConversionSvc
167  ATH_CHECK(m_cnvSvc.retrieve());
168  IProperty* propertyServer = dynamic_cast<IProperty*>(m_cnvSvc.get());
169  if (propertyServer == nullptr) {
170  ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
171  return StatusCode::FAILURE;
172  } else {
173  std::string propertyName = "ParallelCompression";
174  bool parallelCompression(false);
175  BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
176  if (propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
177  ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
178  } else if (parallelCompressionProp.value()) {
179  int streamPort = 0;
180  propertyName = "StreamPortString";
181  std::string streamPortString("");
182  StringProperty streamPortStringProp(propertyName, streamPortString);
183  if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
184  ATH_MSG_INFO("Conversion service does not have StreamPortString property, using default: " << streamPort);
185  } else {
186  streamPort = atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(':') + 1).c_str());
187  }
188  m_rootServerSocket = new TServerSocket(streamPort, (streamPort == 0 ? false : true), 100);
189  if (m_rootServerSocket == nullptr || !m_rootServerSocket->IsValid()) {
190  ATH_MSG_FATAL("Could not create ROOT TServerSocket: " << streamPort);
191  return StatusCode::FAILURE;
192  }
193  streamPort = m_rootServerSocket->GetLocalPort();
194  const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(':')+1) + std::to_string(streamPort)};
195  if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
196  ATH_MSG_FATAL("Could not set Conversion Service property " << propertyName << " from " << streamPortString << " to " << newStreamPortString);
197  return StatusCode::FAILURE;
198  }
199  m_rootMonitor = new TMonitor;
201  ATH_MSG_DEBUG("Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
202  }
203  }
204  // Count the number of output streams
205  const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
206  for (const auto& alg : algMgr->getAlgorithms()) {
207  if (alg->type() == "AthenaOutputStream") {
208  ATH_MSG_DEBUG("Counting " << alg->name() << " as an output stream algorithm");
210  }
211  }
212  if (m_numberOfStreams == 0) {
213  ATH_MSG_WARNING("No output stream algorithm found, setting the number of streams to 1");
214  m_numberOfStreams = 1;
215  } else {
216  ATH_MSG_INFO("Found a total of " << m_numberOfStreams << " output streams");
217  }
218 
219  return StatusCode::SUCCESS;
220 }
221 //___________________________________________________________________________
222 StatusCode AthenaRootSharedWriterSvc::share(int numClients, bool motherClient) {
223  ATH_MSG_DEBUG("Start commitOutput loop");
224  StatusCode sc = m_cnvSvc->commitOutput("", false);
225 
226  // Allow ROOT clients to start up (by setting active clients)
227  // and wait to stop the ROOT server until all clients are done and metadata is written (commitOutput fail).
228  bool anyActiveClients = (m_rootServerSocket != nullptr);
229  while (sc.isSuccess() || sc.isRecoverable() || anyActiveClients) {
230  if (sc.isSuccess()) {
231  ATH_MSG_VERBOSE("Success in commitOutput loop");
232  } else if (m_rootMonitor != nullptr) {
233  TSocket* socket = m_rootMonitor->Select(1);
234  if (socket != nullptr && socket != (TSocket*)-1) {
235  ATH_MSG_DEBUG("ROOT Monitor got: " << socket);
236  if (socket->IsA() == TServerSocket::Class()) {
237  TSocket* client = ((TServerSocket*)socket)->Accept();
238  client->Send(m_rootClientIndex, 0);
239  client->Send(1, 1);
242  if (m_rootClientCount < (numClients-1)*m_numberOfStreams + 1) {
243  m_rootMonitor->Add(client);
244  ATH_MSG_INFO("ROOT Monitor add client: " << m_rootClientIndex << ", " << client);
245  } else {
246  ATH_MSG_WARNING("ROOT Monitor do NOT add client: " << m_rootClientIndex << ", " << client);
247  client->Close("force");
249  }
250  } else {
251  TMessage* message = nullptr;
252  Int_t result = socket->Recv(message);
253  if (result < 0) {
254  ATH_MSG_ERROR("ROOT Monitor got an error while receiving the message from the socket: " << result);
255  return StatusCode::FAILURE;
256  }
257  if (message == nullptr) {
258  ATH_MSG_WARNING("ROOT Monitor got no message from socket: " << socket);
259  } else if (message->What() == kMESS_STRING) {
260  char str[64];
261  message->ReadString(str, 64);
262  ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << str);
263  m_rootMonitor->Remove(socket);
264  ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << socket->GetBytesRecv() << ", " << socket->GetBytesSent());
265  socket->Close();
267  if (m_rootMonitor->GetActive() == 0 || m_rootClientCount == 0) {
268  if (!motherClient) {
269  anyActiveClients = false;
270  ATH_MSG_INFO("ROOT Monitor: No more active clients...");
271  } else {
272  motherClient = false;
273  ATH_MSG_INFO("ROOT Monitor: Mother process is done...");
274  if (!m_cnvSvc->commitCatalog().isSuccess()) {
275  ATH_MSG_FATAL("Failed to commit file catalog.");
276  return StatusCode::FAILURE;
277  }
278  }
279  }
280  } else if (message->What() == kMESS_ANY) {
281  long long length;
282  TString filename;
283  int clientId;
284  message->ReadInt(clientId);
285  message->ReadTString(filename);
286  message->ReadLong64(length);
287  ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length);
288  std::unique_ptr<TMemFile> transient(new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE"));
289  message->SetBufferOffset(message->Length() + length);
290  ParallelFileMerger* info = static_cast<ParallelFileMerger*>(m_rootMergers.FindObject(filename));
291  if (!info) {
292  info = new ParallelFileMerger(filename, transient->GetCompressionSettings());
293  m_rootMergers.Add(info);
294  ATH_MSG_INFO("ROOT Monitor ParallelFileMerger: " << info << ", for: " << filename);
295  }
296  info->MergeTrees(transient.get());
297  }
298  delete message; message = nullptr;
299  }
300  }
301  } else if (m_rootMonitor == nullptr) {
302  usleep(100);
303  }
304  // Once commitOutput failed all legacy clients are finished (writing metadata), do not call again.
305  if (sc.isSuccess() || sc.isRecoverable()) {
306  sc = m_cnvSvc->commitOutput("", false);
307  if (sc.isFailure() && !sc.isRecoverable()) {
308  ATH_MSG_INFO("commitOutput failed, metadata done.");
309  if (anyActiveClients && m_rootClientCount == 0) {
310  ATH_MSG_INFO("ROOT Monitor: No clients, terminating the loop...");
311  anyActiveClients = false;
312  }
313  }
314  }
315  }
316  ATH_MSG_INFO("End commitOutput loop");
317  return StatusCode::SUCCESS;
318 }
319 //___________________________________________________________________________
321  m_rootMergers.Delete();
322  return StatusCode::SUCCESS;
323 }
324 //___________________________________________________________________________
326  ATH_MSG_INFO("in finalize()");
327  delete m_rootMonitor; m_rootMonitor = nullptr;
328  delete m_rootServerSocket; m_rootServerSocket = nullptr;
329  return StatusCode::SUCCESS;
330 }
grepfile.info
info
Definition: grepfile.py:38
AthenaRootSharedWriterSvc::m_rootClientCount
int m_rootClientCount
Definition: AthenaRootSharedWriterSvc.h:50
TFileMerger.h
get_hdefs.buff
buff
Definition: get_hdefs.py:61
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
SGout2dot.alg
alg
Definition: SGout2dot.py:243
get_generator_info.result
result
Definition: get_generator_info.py:21
python.SystemOfUnits.m
int m
Definition: SystemOfUnits.py:91
Amg::compress
void compress(const AmgSymMatrix(N) &covMatrix, std::vector< float > &vec)
Definition: EventPrimitivesHelpers.h:56
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
BranchDesc::dummy
void * dummy
Definition: AthenaRootSharedWriterSvc.cxx:36
BranchDesc::dummyAddr
void * dummyAddr()
Definition: AthenaRootSharedWriterSvc.cxx:40
AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc
AthenaRootSharedWriterSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: AthenaRootSharedWriterSvc.cxx:158
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
ParallelFileMerger::syncBranches
bool syncBranches(TTree *fromTree, TTree *toTree)
Definition: AthenaRootSharedWriterSvc.cxx:80
empty
bool empty(TH1 *h)
Definition: computils.cxx:294
ReweightUtils.message
message
Definition: ReweightUtils.py:15
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
rerun_display.client
client
Definition: rerun_display.py:31
AthenaRootSharedWriterSvc::m_rootClientIndex
int m_rootClientIndex
Definition: AthenaRootSharedWriterSvc.h:49
AthenaRootSharedWriterSvc::share
virtual StatusCode share(int numClients=0, bool motherClient=false) override
Definition: AthenaRootSharedWriterSvc.cxx:222
CaloCondBlobAlgs_fillNoiseFromASCII.desc
desc
Definition: CaloCondBlobAlgs_fillNoiseFromASCII.py:54
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ParallelFileMerger::Hash
ULong_t Hash() const
Definition: AthenaRootSharedWriterSvc.cxx:69
PlotPulseshapeFromCool.input
input
Definition: PlotPulseshapeFromCool.py:106
ParallelFileMerger::fFilename
TString fFilename
Definition: AthenaRootSharedWriterSvc.cxx:57
ParallelFileMerger::MergeTrees
Bool_t MergeTrees(TFile *input)
Definition: AthenaRootSharedWriterSvc.cxx:125
AthenaRootSharedWriterSvc::finalize
virtual StatusCode finalize() override
Definition: AthenaRootSharedWriterSvc.cxx:325
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaRootSharedWriterSvc::m_rootServerSocket
TServerSocket * m_rootServerSocket
Definition: AthenaRootSharedWriterSvc.h:46
ParallelFileMerger::fMerger
TFileMerger fMerger
Definition: AthenaRootSharedWriterSvc.cxx:58
BranchDesc::clazz
TClass * clazz
Definition: AthenaRootSharedWriterSvc.cxx:33
ParallelFileMerger::GetName
const char * GetName() const
Definition: AthenaRootSharedWriterSvc.cxx:74
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:221
ActsTrk::to_string
std::string to_string(const DetectorType &type)
Definition: GeometryDefs.h:34
BranchDesc::dummyptr
std::unique_ptr< void, std::function< void(void *)> > dummyptr
Definition: AthenaRootSharedWriterSvc.cxx:35
ParallelFileMerger::ParallelFileMerger
ParallelFileMerger(const char *filename, int compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Definition: AthenaRootSharedWriterSvc.cxx:60
AthenaRootSharedWriterSvc::stop
virtual StatusCode stop() override
Definition: AthenaRootSharedWriterSvc.cxx:320
AthenaRootSharedWriterSvc::initialize
virtual StatusCode initialize() override
Gaudi Service Interface method implementations:
Definition: AthenaRootSharedWriterSvc.cxx:163
ParallelFileMerger::~ParallelFileMerger
~ParallelFileMerger()
Definition: AthenaRootSharedWriterSvc.cxx:65
AthenaRootSharedWriterSvc::m_rootMergers
THashTable m_rootMergers
Definition: AthenaRootSharedWriterSvc.h:48
RTTAlgmain.branch
branch
Definition: RTTAlgmain.py:61
TSocket.h
BranchDesc::dummy_ptr_t
std::unique_ptr< void, std::function< void(void *)> > dummy_ptr_t
Definition: AthenaRootSharedWriterSvc.cxx:34
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaRootSharedWriterSvc::m_rootMonitor
TMonitor * m_rootMonitor
Definition: AthenaRootSharedWriterSvc.h:47
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
AthenaRootSharedWriterSvc.h
This file contains the class definition for the AthenaRootSharedWriterSvc class.
BranchDesc::BranchDesc
BranchDesc(TClass *cl)
Definition: AthenaRootSharedWriterSvc.cxx:38
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
ParallelFileMerger
Definition: AthenaRootSharedWriterSvc.cxx:56
str
Definition: BTagTrackIpAccessor.cxx:11
AthenaRootSharedWriterSvc::m_numberOfStreams
int m_numberOfStreams
Definition: AthenaRootSharedWriterSvc.h:51
BranchDesc
Definiton of a branch descriptor from RootTreeContainer.
Definition: AthenaRootSharedWriterSvc.cxx:31
dqBeamSpot.nEntries
int nEntries
Definition: dqBeamSpot.py:73
AthenaRootSharedWriterSvc::m_cnvSvc
ServiceHandle< IAthenaPoolCnvSvc > m_cnvSvc
Definition: AthenaRootSharedWriterSvc.h:44
dq_make_web_display.cl
cl
print [x.__class__ for x in toList(dqregion.getSubRegions()) ]
Definition: dq_make_web_display.py:26
length
double length(const pvec &v)
Definition: FPGATrackSimLLPDoubletHoughTransformTool.cxx:26
fitman.k
k
Definition: fitman.py:528
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37