ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaRootSharedWriterSvc.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
9
10#include "GaudiKernel/IAlgManager.h"
12
13#include "TBranch.h"
14#include "TClass.h"
15#include "TFile.h"
16#include "TFileMerger.h"
17#include "TKey.h"
18#include "TLeaf.h"
19#include "TMemFile.h"
20#include "TMessage.h"
21#include "TMonitor.h"
22#include "TServerSocket.h"
23#include "TSocket.h"
24#include "TString.h"
25#include "TTree.h"
26
27#include <set>
28#include <map>
29
31struct BranchDesc {
32public:
33 TClass* clazz;
34 using dummy_ptr_t = std::unique_ptr<void, std::function<void(void*)> >;
35 std::unique_ptr<void, std::function<void(void*)> > dummyptr;
36 void* dummy = 0;
37
38 BranchDesc(TClass* cl) : clazz(cl) {}
39
40 void* dummyAddr()
41 {
42 if (clazz) {
43 void(TClass::*dxtor)(void*, Bool_t) = &TClass::Destructor;
44 std::function<void(void*)> del = std::bind(dxtor, clazz, std::placeholders::_1, false);
45 dummyptr = std::unique_ptr<void, std::function<void(void*)> >(clazz->New(), std::move(del));
46 dummy = dummyptr.get();
47 return &dummy;
48 }
49 return nullptr;
50 }
51};
52
53/* Code from ROOT tutorials/net/parallelMergeServer.C, reduced to handle TTrees only */
54
55struct ParallelFileMerger : public TObject
56{
57 TString fFilename;
58 TFileMerger fMerger;
59
60 ParallelFileMerger(const char *filename, int compress = ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault) : fFilename(filename), fMerger(kFALSE, kTRUE)
61 {
62 fMerger.OutputFile(filename, "RECREATE", compress);
63 }
64
68
69 ULong_t Hash() const
70 {
71 return fFilename.Hash();
72 }
73
74 const char* GetName() const
75 {
76 return fFilename;
77 }
78
79// Add missing branches to client tree and BackFill before merging
80 bool syncBranches(TTree* fromTree, TTree* toTree)
81 {
82 bool updated = false;
83 const TObjArray* fromBranches = fromTree->GetListOfBranches();
84 const TObjArray* toBranches = toTree->GetListOfBranches();
85 int nBranches = fromBranches->GetEntriesFast();
86 for (int k = 0; k < nBranches; ++k) {
87 TBranch* branch = static_cast<TBranch*>(fromBranches->UncheckedAt(k));
88 if (toBranches->FindObject(branch->GetName()) == nullptr) {
89 TBranch* newBranch = nullptr;
90 TClass* cl = TClass::GetClass(branch->GetClassName());
91 BranchDesc desc(cl);
92 void* empty = desc.dummyAddr();
93 if (strlen(branch->GetClassName()) > 0) {
94 newBranch = toTree->Branch(branch->GetName(), branch->GetClassName(), nullptr, branch->GetBasketSize(), branch->GetSplitLevel());
95 newBranch->SetAddress(empty);
96 } else {
97 TObjArray* outLeaves = branch->GetListOfLeaves();
98 TLeaf* leaf = static_cast<TLeaf*>(outLeaves->UncheckedAt(0));
99 std::string type = leaf->GetTypeName();
100 std::string attr = leaf->GetName();
101 if (type == "Int_t") type = attr + "/I";
102 else if (type == "Short_t") type = attr + "/S";
103 else if (type == "Long_t") type = attr + "/L";
104 else if (type == "UInt_t") type = attr + "/i";
105 else if (type == "UShort_t") type = attr + "/s";
106 else if (type == "ULong_t") type = attr + "/l";
107 else if (type == "Float_t") type = attr + "/F";
108 else if (type == "Double_t") type = attr + "/D";
109 else if (type == "Char_t") type = attr + "/B";
110 else if (type == "UChar_t") type = attr + "/b";
111 else if (type == "Bool_t") type = attr + "/O";
112 newBranch = toTree->Branch(branch->GetName(), static_cast<void*>(nullptr), type.c_str(), 2048);
113 }
114 int nEntries = toTree->GetEntries();
115 for (int m = 0; m < nEntries; ++m) {
116 newBranch->BackFill();
117 }
118 updated = true;
119 }
120 }
121 return updated;
122 }
123
124 Bool_t MergeTrees(TFile *input)
125 {
126 fMerger.AddFile(input);
127 TIter nextKey(input->GetListOfKeys());
128 while (TKey* key = static_cast<TKey*>(nextKey())) {
129 TClass* cl = TClass::GetClass(key->GetClassName());
130 if (cl != nullptr && cl->InheritsFrom("TTree")) {
131 TTree* outCollTree = static_cast<TTree*>(fMerger.GetOutputFile()->Get(key->GetName()));
132 TTree* inCollTree = static_cast<TTree*>(input->Get(key->GetName()));
133 if (inCollTree != nullptr && outCollTree != nullptr) {
134 if (syncBranches(outCollTree, inCollTree)) {
135 input->Write();
136 }
137 syncBranches(inCollTree, outCollTree);
138 }
139 }
140 }
141
142 Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable | TFileMerger::kKeepCompression);
143 nextKey = input->GetListOfKeys();
144 while (TKey* key = static_cast<TKey*>(nextKey())) {
145 TClass* cl = TClass::GetClass(key->GetClassName());
146 if (cl != nullptr && 0 != cl->GetResetAfterMerge()) {
147 key->Delete();
148 input->GetListOfKeys()->Remove(key);
149 delete key;
150 }
151 }
152 return result;
153 }
154};
155
156//___________________________________________________________________________
157AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc(const std::string& name, ISvcLocator* pSvcLocator)
158 : base_class(name, pSvcLocator)
160}
161//___________________________________________________________________________
163 ATH_MSG_INFO("in initialize()");
164
165 // Initialize IConversionSvc
166 ATH_CHECK(m_cnvSvc.retrieve());
167 IProperty* propertyServer = dynamic_cast<IProperty*>(m_cnvSvc.get());
168 if (propertyServer == nullptr) {
169 ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
170 return StatusCode::FAILURE;
171 } else {
172 std::string propertyName = "ParallelCompression";
173 bool parallelCompression(false);
174 BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
175 if (propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
176 ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
177 } else if (parallelCompressionProp.value()) {
178 int streamPort = 0;
179 propertyName = "StreamPortString";
180 std::string streamPortString("");
181 StringProperty streamPortStringProp(propertyName, streamPortString);
182 if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
183 ATH_MSG_INFO("Conversion service does not have StreamPortString property, using default: " << streamPort);
184 } else {
185 streamPort = atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(':') + 1).c_str());
186 }
187 m_rootServerSocket = new TServerSocket(streamPort, (streamPort == 0 ? false : true), 100, -1, ESocketBindOption::kInaddrLoopback);
188 if (m_rootServerSocket == nullptr || !m_rootServerSocket->IsValid()) {
189 ATH_MSG_FATAL("Could not create ROOT TServerSocket: " << streamPort);
190 return StatusCode::FAILURE;
191 }
192 streamPort = m_rootServerSocket->GetLocalPort();
193 const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(':')+1) + std::to_string(streamPort)};
194 if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
195 ATH_MSG_FATAL("Could not set Conversion Service property " << propertyName << " from " << streamPortString << " to " << newStreamPortString);
196 return StatusCode::FAILURE;
197 }
198 m_rootMonitor = new TMonitor;
200 ATH_MSG_DEBUG("Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
201 }
202 }
203 // Count the number of output streams
204 const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
205 for (const auto& alg : algMgr->getAlgorithms()) {
206 if (alg->type() == "AthenaOutputStream") {
207 ATH_MSG_DEBUG("Counting " << alg->name() << " as an output stream algorithm");
209 }
210 }
211 if (m_numberOfStreams == 0) {
212 ATH_MSG_WARNING("No output stream algorithm found, setting the number of streams to 1");
214 } else {
215 ATH_MSG_INFO("Found a total of " << m_numberOfStreams << " output streams");
216 }
217
218 return StatusCode::SUCCESS;
219}
220//___________________________________________________________________________
221StatusCode AthenaRootSharedWriterSvc::share(int numClients, bool motherClient) {
222 ATH_MSG_DEBUG("Start commitOutput loop");
223 StatusCode sc = m_cnvSvc->commitOutput("", false);
224
225 // Allow ROOT clients to start up (by setting active clients)
226 // and wait to stop the ROOT server until all clients are done and metadata is written (commitOutput fail).
227 bool anyActiveClients = (m_rootServerSocket != nullptr);
228 while (sc.isSuccess() || sc.isRecoverable() || anyActiveClients) {
229 if (sc.isSuccess()) {
230 ATH_MSG_VERBOSE("Success in commitOutput loop");
231 } else if (m_rootMonitor != nullptr) {
232 TSocket* socket = m_rootMonitor->Select(1);
233 if (socket != nullptr && socket != (TSocket*)-1) {
234 ATH_MSG_DEBUG("ROOT Monitor got: " << socket);
235 if (socket->IsA() == TServerSocket::Class()) {
236 TSocket* client = (static_cast<TServerSocket*>(socket))->Accept();
237 client->Send(m_rootClientIndex, 0);
238 client->Send(1, 1);
241 if (m_rootClientCount < (numClients-1)*m_numberOfStreams + 1) {
242 m_rootMonitor->Add(client);
243 ATH_MSG_INFO("ROOT Monitor add client: " << m_rootClientIndex << ", " << client);
244 } else {
245 ATH_MSG_WARNING("ROOT Monitor do NOT add client: " << m_rootClientIndex << ", " << client);
246 client->Close("force");
248 }
249 } else {
250 TMessage* message = nullptr;
251 Int_t result = socket->Recv(message);
252 if (result < 0) {
253 ATH_MSG_ERROR("ROOT Monitor got an error while receiving the message from the socket: " << result);
254 return StatusCode::FAILURE;
255 }
256 if (message == nullptr) {
257 ATH_MSG_WARNING("ROOT Monitor got no message from socket: " << socket);
258 } else if (message->What() == kMESS_STRING) {
259 char str[64];
260 message->ReadString(str, 64);
261 ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << str);
262 m_rootMonitor->Remove(socket);
263 ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << socket->GetBytesRecv() << ", " << socket->GetBytesSent());
264 socket->Close();
266 if (m_rootMonitor->GetActive() == 0 || m_rootClientCount == 0) {
267 if (!motherClient) {
268 anyActiveClients = false;
269 ATH_MSG_INFO("ROOT Monitor: No more active clients...");
270 } else {
271 motherClient = false;
272 ATH_MSG_INFO("ROOT Monitor: Mother process is done...");
273 if (!m_cnvSvc->commitCatalog().isSuccess()) {
274 ATH_MSG_FATAL("Failed to commit file catalog.");
275 return StatusCode::FAILURE;
276 }
277 }
278 }
279 } else if (message->What() == kMESS_ANY) {
280 long long length;
281 TString filename;
282 int clientId;
283 message->ReadInt(clientId);
284 message->ReadTString(filename);
285 message->ReadLong64(length);
286 ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length);
287 std::unique_ptr<TMemFile> transient(new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE"));
288 message->SetBufferOffset(message->Length() + length);
289 ParallelFileMerger* info = static_cast<ParallelFileMerger*>(m_rootMergers.FindObject(filename));
290 if (!info) {
291 info = new ParallelFileMerger(filename, transient->GetCompressionSettings());
292 m_rootMergers.Add(info);
293 ATH_MSG_INFO("ROOT Monitor ParallelFileMerger: " << info << ", for: " << filename);
294 }
295 info->MergeTrees(transient.get());
296 }
297 delete message; message = nullptr;
298 }
299 }
300 } else if (m_rootMonitor == nullptr) {
301 usleep(100);
302 }
303 // Once commitOutput failed all legacy clients are finished (writing metadata), do not call again.
304 if (sc.isSuccess() || sc.isRecoverable()) {
305 sc = m_cnvSvc->commitOutput("", false);
306 if (sc.isFailure() && !sc.isRecoverable()) {
307 ATH_MSG_INFO("commitOutput failed, metadata done.");
308 if (anyActiveClients && m_rootClientCount == 0) {
309 ATH_MSG_INFO("ROOT Monitor: No clients, terminating the loop...");
310 anyActiveClients = false;
311 }
312 }
313 }
314 }
315 ATH_MSG_INFO("End commitOutput loop");
316 return StatusCode::SUCCESS;
317}
318//___________________________________________________________________________
320 m_rootMergers.Delete();
321 return StatusCode::SUCCESS;
322}
323//___________________________________________________________________________
325 ATH_MSG_INFO("in finalize()");
326 delete m_rootMonitor; m_rootMonitor = nullptr;
327 delete m_rootServerSocket; m_rootServerSocket = nullptr;
328 return StatusCode::SUCCESS;
329}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
This file contains the class definition for the AthenaRootSharedWriterSvc class.
double length(const pvec &v)
static Double_t sc
static const Attributes_t empty
Templated class containing a cut, name of cut and description of cut(optional) Typically,...
Definition CutFlow.h:28
virtual StatusCode stop() override
virtual StatusCode finalize() override
virtual StatusCode share(int numClients=0, bool motherClient=false) override
virtual StatusCode initialize() override
Gaudi Service Interface method implementations:
AthenaRootSharedWriterSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
ServiceHandle< AthenaPoolSharedIOCnvSvc > m_cnvSvc
T * Get(TFile &f, const std::string &n, const std::string &dir="", const chainmap_t *chainmap=0, std::vector< std::string > *saved=0)
get a histogram given a path, and an optional initial directory if histogram is not found,...
Definiton of a branch descriptor from RootTreeContainer.
std::unique_ptr< void, std::function< void(void *)> > dummy_ptr_t
std::unique_ptr< void, std::function< void(void *)> > dummyptr
bool syncBranches(TTree *fromTree, TTree *toTree)
ParallelFileMerger(const char *filename, int compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Bool_t MergeTrees(TFile *input)