ATLAS Offline Software
AthenaRootSharedWriterSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 */
4 
10 #include "GaudiKernel/IAlgManager.h"
12 
13 #include "TBranch.h"
14 #include "TClass.h"
15 #include "TFile.h"
16 #include "TFileMerger.h"
17 #include "TKey.h"
18 #include "TLeaf.h"
19 #include "TMemFile.h"
20 #include "TMessage.h"
21 #include "TMonitor.h"
22 #include "TServerSocket.h"
23 #include "TSocket.h"
24 #include "TString.h"
25 #include "TTree.h"
26 
27 #include <set>
28 #include <map>
29 
31 struct BranchDesc {
32 public:
33  TClass* clazz;
34  using dummy_ptr_t = std::unique_ptr<void, std::function<void(void*)> >;
35  std::unique_ptr<void, std::function<void(void*)> > dummyptr;
36  void* dummy = 0;
37 
38  BranchDesc(TClass* cl) : clazz(cl) {}
39 
40  void* dummyAddr()
41  {
42  if (clazz) {
43  void(TClass::*dxtor)(void*, Bool_t) = &TClass::Destructor;
44  std::function<void(void*)> del = std::bind(dxtor, clazz, std::placeholders::_1, false);
45  dummyptr = std::unique_ptr<void, std::function<void(void*)> >(clazz->New(), std::move(del));
46  dummy = dummyptr.get();
47  return &dummy;
48  }
49  return nullptr;
50  }
51 };
52 
53 /* Code from ROOT tutorials/net/parallelMergeServer.C, reduced to handle TTrees only */
54 
55 struct ParallelFileMerger : public TObject
56 {
57  TString fFilename;
58  TFileMerger fMerger;
59 
60  ParallelFileMerger(const char *filename, int compress = ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault) : fFilename(filename), fMerger(kFALSE, kTRUE)
61  {
62  fMerger.OutputFile(filename, "RECREATE", compress);
63  }
64 
66  {
67  }
68 
69  ULong_t Hash() const
70  {
71  return fFilename.Hash();
72  }
73 
74  const char* GetName() const
75  {
76  return fFilename;
77  }
78 
79 // Add missing branches to client tree and BackFill before merging
80  bool syncBranches(TTree* fromTree, TTree* toTree)
81  {
82  bool updated = false;
83  const TObjArray* fromBranches = fromTree->GetListOfBranches();
84  const TObjArray* toBranches = toTree->GetListOfBranches();
85  int nBranches = fromBranches->GetEntriesFast();
86  for (int k = 0; k < nBranches; ++k) {
87  TBranch* branch = static_cast<TBranch*>(fromBranches->UncheckedAt(k));
88  if (toBranches->FindObject(branch->GetName()) == nullptr) {
89  TBranch* newBranch = nullptr;
90  TClass* cl = TClass::GetClass(branch->GetClassName());
92  void* empty = desc.dummyAddr();
93  if (strlen(branch->GetClassName()) > 0) {
94  newBranch = toTree->Branch(branch->GetName(), branch->GetClassName(), nullptr, branch->GetBasketSize(), branch->GetSplitLevel());
95  newBranch->SetAddress(empty);
96  } else {
97  TObjArray* outLeaves = branch->GetListOfLeaves();
98  TLeaf* leaf = static_cast<TLeaf*>(outLeaves->UncheckedAt(0));
99  std::string type = leaf->GetTypeName();
100  std::string attr = leaf->GetName();
101  if (type == "Int_t") type = attr + "/I";
102  else if (type == "Short_t") type = attr + "/S";
103  else if (type == "Long_t") type = attr + "/L";
104  else if (type == "UInt_t") type = attr + "/i";
105  else if (type == "UShort_t") type = attr + "/s";
106  else if (type == "ULong_t") type = attr + "/l";
107  else if (type == "Float_t") type = attr + "/F";
108  else if (type == "Double_t") type = attr + "/D";
109  else if (type == "Char_t") type = attr + "/B";
110  else if (type == "UChar_t") type = attr + "/b";
111  else if (type == "Bool_t") type = attr + "/O";
112  newBranch = toTree->Branch(branch->GetName(), static_cast<void*>(nullptr), type.c_str(), 2048);
113  }
114  int nEntries = toTree->GetEntries();
115  for (int m = 0; m < nEntries; ++m) {
116  newBranch->BackFill();
117  }
118  updated = true;
119  }
120  }
121  return updated;
122  }
123 
124  Bool_t MergeTrees(TFile *input)
125  {
126  fMerger.AddFile(input);
127  TIter nextKey(input->GetListOfKeys());
128  while (TKey* key = static_cast<TKey*>(nextKey())) {
129  TClass* cl = TClass::GetClass(key->GetClassName());
130  if (cl != nullptr && cl->InheritsFrom("TTree")) {
131  TTree* outCollTree = static_cast<TTree*>(fMerger.GetOutputFile()->Get(key->GetName()));
132  TTree* inCollTree = static_cast<TTree*>(input->Get(key->GetName()));
133  if (inCollTree != nullptr && outCollTree != nullptr) {
134  if (syncBranches(outCollTree, inCollTree)) {
135  input->Write();
136  }
137  syncBranches(inCollTree, outCollTree);
138  }
139  }
140  }
141 
142  Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable | TFileMerger::kKeepCompression);
143  nextKey = input->GetListOfKeys();
144  while (TKey* key = static_cast<TKey*>(nextKey())) {
145  TClass* cl = TClass::GetClass(key->GetClassName());
146  if (cl != nullptr && 0 != cl->GetResetAfterMerge()) {
147  key->Delete();
148  input->GetListOfKeys()->Remove(key);
149  delete key;
150  }
151  }
152  return result;
153  }
154 };
155 
156 //___________________________________________________________________________
157 AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc(const std::string& name, ISvcLocator* pSvcLocator)
158  : base_class(name, pSvcLocator)
159  , m_rootServerSocket(nullptr), m_rootMonitor(nullptr), m_rootMergers(), m_rootClientIndex(0), m_rootClientCount(0), m_numberOfStreams(0) {
160 }
161 //___________________________________________________________________________
163  ATH_MSG_INFO("in initialize()");
164 
165  // Initialize IConversionSvc
166  ATH_CHECK(m_cnvSvc.retrieve());
167  IProperty* propertyServer = dynamic_cast<IProperty*>(m_cnvSvc.get());
168  if (propertyServer == nullptr) {
169  ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
170  return StatusCode::FAILURE;
171  } else {
172  std::string propertyName = "ParallelCompression";
173  bool parallelCompression(false);
174  BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
175  if (propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
176  ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
177  } else if (parallelCompressionProp.value()) {
178  int streamPort = 0;
179  propertyName = "StreamPortString";
180  std::string streamPortString("");
181  StringProperty streamPortStringProp(propertyName, streamPortString);
182  if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
183  ATH_MSG_INFO("Conversion service does not have StreamPortString property, using default: " << streamPort);
184  } else {
185  streamPort = atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(':') + 1).c_str());
186  }
187  m_rootServerSocket = new TServerSocket(streamPort, (streamPort == 0 ? false : true), 100);
188  if (m_rootServerSocket == nullptr || !m_rootServerSocket->IsValid()) {
189  ATH_MSG_FATAL("Could not create ROOT TServerSocket: " << streamPort);
190  return StatusCode::FAILURE;
191  }
192  streamPort = m_rootServerSocket->GetLocalPort();
193  const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(':')+1) + std::to_string(streamPort)};
194  if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
195  ATH_MSG_FATAL("Could not set Conversion Service property " << propertyName << " from " << streamPortString << " to " << newStreamPortString);
196  return StatusCode::FAILURE;
197  }
198  m_rootMonitor = new TMonitor;
200  ATH_MSG_DEBUG("Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
201  }
202  }
203  // Count the number of output streams
204  const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
205  for (const auto& alg : algMgr->getAlgorithms()) {
206  if (alg->type() == "AthenaOutputStream") {
207  ATH_MSG_DEBUG("Counting " << alg->name() << " as an output stream algorithm");
209  }
210  }
211  if (m_numberOfStreams == 0) {
212  ATH_MSG_WARNING("No output stream algorithm found, setting the number of streams to 1");
213  m_numberOfStreams = 1;
214  } else {
215  ATH_MSG_INFO("Found a total of " << m_numberOfStreams << " output streams");
216  }
217 
218  return StatusCode::SUCCESS;
219 }
220 //___________________________________________________________________________
221 StatusCode AthenaRootSharedWriterSvc::share(int numClients, bool motherClient) {
222  ATH_MSG_DEBUG("Start commitOutput loop");
223  StatusCode sc = m_cnvSvc->commitOutput("", false);
224 
225  // Allow ROOT clients to start up (by setting active clients)
226  // and wait to stop the ROOT server until all clients are done and metadata is written (commitOutput fail).
227  bool anyActiveClients = (m_rootServerSocket != nullptr);
228  while (sc.isSuccess() || sc.isRecoverable() || anyActiveClients) {
229  if (sc.isSuccess()) {
230  ATH_MSG_VERBOSE("Success in commitOutput loop");
231  } else if (m_rootMonitor != nullptr) {
232  TSocket* socket = m_rootMonitor->Select(1);
233  if (socket != nullptr && socket != (TSocket*)-1) {
234  ATH_MSG_DEBUG("ROOT Monitor got: " << socket);
235  if (socket->IsA() == TServerSocket::Class()) {
236  TSocket* client = ((TServerSocket*)socket)->Accept();
237  client->Send(m_rootClientIndex, 0);
238  client->Send(1, 1);
241  if (m_rootClientCount < (numClients-1)*m_numberOfStreams + 1) {
242  m_rootMonitor->Add(client);
243  ATH_MSG_INFO("ROOT Monitor add client: " << m_rootClientIndex << ", " << client);
244  } else {
245  ATH_MSG_WARNING("ROOT Monitor do NOT add client: " << m_rootClientIndex << ", " << client);
246  client->Close("force");
248  }
249  } else {
250  TMessage* message = nullptr;
251  Int_t result = socket->Recv(message);
252  if (result < 0) {
253  ATH_MSG_ERROR("ROOT Monitor got an error while receiving the message from the socket: " << result);
254  return StatusCode::FAILURE;
255  }
256  if (message == nullptr) {
257  ATH_MSG_WARNING("ROOT Monitor got no message from socket: " << socket);
258  } else if (message->What() == kMESS_STRING) {
259  char str[64];
260  message->ReadString(str, 64);
261  ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << str);
262  m_rootMonitor->Remove(socket);
263  ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << socket->GetBytesRecv() << ", " << socket->GetBytesSent());
264  socket->Close();
266  if (m_rootMonitor->GetActive() == 0 || m_rootClientCount == 0) {
267  if (!motherClient) {
268  anyActiveClients = false;
269  ATH_MSG_INFO("ROOT Monitor: No more active clients...");
270  } else {
271  motherClient = false;
272  ATH_MSG_INFO("ROOT Monitor: Mother process is done...");
273  if (!m_cnvSvc->commitCatalog().isSuccess()) {
274  ATH_MSG_FATAL("Failed to commit file catalog.");
275  return StatusCode::FAILURE;
276  }
277  }
278  }
279  } else if (message->What() == kMESS_ANY) {
280  long long length;
281  TString filename;
282  int clientId;
283  message->ReadInt(clientId);
284  message->ReadTString(filename);
285  message->ReadLong64(length);
286  ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length);
287  std::unique_ptr<TMemFile> transient(new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE"));
288  message->SetBufferOffset(message->Length() + length);
289  ParallelFileMerger* info = static_cast<ParallelFileMerger*>(m_rootMergers.FindObject(filename));
290  if (!info) {
291  info = new ParallelFileMerger(filename, transient->GetCompressionSettings());
292  m_rootMergers.Add(info);
293  ATH_MSG_INFO("ROOT Monitor ParallelFileMerger: " << info << ", for: " << filename);
294  }
295  info->MergeTrees(transient.get());
296  }
297  delete message; message = nullptr;
298  }
299  }
300  } else if (m_rootMonitor == nullptr) {
301  usleep(100);
302  }
303  // Once commitOutput failed all legacy clients are finished (writing metadata), do not call again.
304  if (sc.isSuccess() || sc.isRecoverable()) {
305  sc = m_cnvSvc->commitOutput("", false);
306  if (sc.isFailure() && !sc.isRecoverable()) {
307  ATH_MSG_INFO("commitOutput failed, metadata done.");
308  if (anyActiveClients && m_rootClientCount == 0) {
309  ATH_MSG_INFO("ROOT Monitor: No clients, terminating the loop...");
310  anyActiveClients = false;
311  }
312  }
313  }
314  }
315  ATH_MSG_INFO("End commitOutput loop");
316  return StatusCode::SUCCESS;
317 }
318 //___________________________________________________________________________
320  m_rootMergers.Delete();
321  return StatusCode::SUCCESS;
322 }
323 //___________________________________________________________________________
325  ATH_MSG_INFO("in finalize()");
326  delete m_rootMonitor; m_rootMonitor = nullptr;
327  delete m_rootServerSocket; m_rootServerSocket = nullptr;
328  return StatusCode::SUCCESS;
329 }
grepfile.info
info
Definition: grepfile.py:38
AthenaRootSharedWriterSvc::m_rootClientCount
int m_rootClientCount
Definition: AthenaRootSharedWriterSvc.h:50
TFileMerger.h
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
SGout2dot.alg
alg
Definition: SGout2dot.py:243
get_generator_info.result
result
Definition: get_generator_info.py:21
python.SystemOfUnits.m
int m
Definition: SystemOfUnits.py:91
Amg::compress
void compress(const AmgSymMatrix(N) &covMatrix, std::vector< float > &vec)
Definition: EventPrimitivesHelpers.h:56
ATH_MSG_INFO
#define ATH_MSG_INFO(x)
Definition: AthMsgStreamMacros.h:31
BranchDesc::dummy
void * dummy
Definition: AthenaRootSharedWriterSvc.cxx:36
BranchDesc::dummyAddr
void * dummyAddr()
Definition: AthenaRootSharedWriterSvc.cxx:40
AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc
AthenaRootSharedWriterSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
Definition: AthenaRootSharedWriterSvc.cxx:157
ATH_MSG_VERBOSE
#define ATH_MSG_VERBOSE(x)
Definition: AthMsgStreamMacros.h:28
ParallelFileMerger::syncBranches
bool syncBranches(TTree *fromTree, TTree *toTree)
Definition: AthenaRootSharedWriterSvc.cxx:80
empty
bool empty(TH1 *h)
Definition: computils.cxx:295
ReweightUtils.message
message
Definition: ReweightUtils.py:15
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
rerun_display.client
client
Definition: rerun_display.py:31
AthenaRootSharedWriterSvc::m_rootClientIndex
int m_rootClientIndex
Definition: AthenaRootSharedWriterSvc.h:49
AthenaRootSharedWriterSvc::share
virtual StatusCode share(int numClients=0, bool motherClient=false) override
Definition: AthenaRootSharedWriterSvc.cxx:221
CaloCondBlobAlgs_fillNoiseFromASCII.desc
desc
Definition: CaloCondBlobAlgs_fillNoiseFromASCII.py:54
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ParallelFileMerger::Hash
ULong_t Hash() const
Definition: AthenaRootSharedWriterSvc.cxx:69
PlotPulseshapeFromCool.input
input
Definition: PlotPulseshapeFromCool.py:106
ParallelFileMerger::fFilename
TString fFilename
Definition: AthenaRootSharedWriterSvc.cxx:57
ParallelFileMerger::MergeTrees
Bool_t MergeTrees(TFile *input)
Definition: AthenaRootSharedWriterSvc.cxx:124
AthenaRootSharedWriterSvc::finalize
virtual StatusCode finalize() override
Definition: AthenaRootSharedWriterSvc.cxx:324
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
AthenaRootSharedWriterSvc::m_rootServerSocket
TServerSocket * m_rootServerSocket
Definition: AthenaRootSharedWriterSvc.h:46
ParallelFileMerger::fMerger
TFileMerger fMerger
Definition: AthenaRootSharedWriterSvc.cxx:58
BranchDesc::clazz
TClass * clazz
Definition: AthenaRootSharedWriterSvc.cxx:33
ParallelFileMerger::GetName
const char * GetName() const
Definition: AthenaRootSharedWriterSvc.cxx:74
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:228
ActsTrk::to_string
std::string to_string(const DetectorType &type)
Definition: GeometryDefs.h:34
BranchDesc::dummyptr
std::unique_ptr< void, std::function< void(void *)> > dummyptr
Definition: AthenaRootSharedWriterSvc.cxx:35
ParallelFileMerger::ParallelFileMerger
ParallelFileMerger(const char *filename, int compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Definition: AthenaRootSharedWriterSvc.cxx:60
AthenaRootSharedWriterSvc::stop
virtual StatusCode stop() override
Definition: AthenaRootSharedWriterSvc.cxx:319
AthenaRootSharedWriterSvc::initialize
virtual StatusCode initialize() override
Gaudi Service Interface method implementations:
Definition: AthenaRootSharedWriterSvc.cxx:162
ParallelFileMerger::~ParallelFileMerger
~ParallelFileMerger()
Definition: AthenaRootSharedWriterSvc.cxx:65
AthenaRootSharedWriterSvc::m_rootMergers
THashTable m_rootMergers
Definition: AthenaRootSharedWriterSvc.h:48
RTTAlgmain.branch
branch
Definition: RTTAlgmain.py:61
TSocket.h
BranchDesc::dummy_ptr_t
std::unique_ptr< void, std::function< void(void *)> > dummy_ptr_t
Definition: AthenaRootSharedWriterSvc.cxx:34
ATH_MSG_WARNING
#define ATH_MSG_WARNING(x)
Definition: AthMsgStreamMacros.h:32
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
AthenaRootSharedWriterSvc::m_rootMonitor
TMonitor * m_rootMonitor
Definition: AthenaRootSharedWriterSvc.h:47
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
CaloCellTimeCorrFiller.filename
filename
Definition: CaloCellTimeCorrFiller.py:24
AthenaRootSharedWriterSvc.h
This file contains the class definition for the AthenaRootSharedWriterSvc class.
BranchDesc::BranchDesc
BranchDesc(TClass *cl)
Definition: AthenaRootSharedWriterSvc.cxx:38
CxxUtils::atoi
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...
Definition: Control/CxxUtils/Root/StringUtils.cxx:85
ParallelFileMerger
Definition: AthenaRootSharedWriterSvc.cxx:56
str
Definition: BTagTrackIpAccessor.cxx:11
AthenaRootSharedWriterSvc::m_numberOfStreams
int m_numberOfStreams
Definition: AthenaRootSharedWriterSvc.h:51
BranchDesc
Definiton of a branch descriptor from RootTreeContainer.
Definition: AthenaRootSharedWriterSvc.cxx:31
dqBeamSpot.nEntries
int nEntries
Definition: dqBeamSpot.py:73
AthenaRootSharedWriterSvc::m_cnvSvc
ServiceHandle< IAthenaPoolCnvSvc > m_cnvSvc
Definition: AthenaRootSharedWriterSvc.h:44
dq_make_web_display.cl
cl
print [x.__class__ for x in toList(dqregion.getSubRegions()) ]
Definition: dq_make_web_display.py:26
length
double length(const pvec &v)
Definition: FPGATrackSimLLPDoubletHoughTransformTool.cxx:26
fitman.k
k
Definition: fitman.py:528
mapkey::key
key
Definition: TElectronEfficiencyCorrectionTool.cxx:37