10#include "GaudiKernel/IAlgManager.h"
22#include "TServerSocket.h"
34 using dummy_ptr_t = std::unique_ptr<void, std::function<void(
void*)> >;
35 std::unique_ptr<void, std::function<void(
void*)> >
dummyptr;
43 void(TClass::*dxtor)(
void*, Bool_t) = &TClass::Destructor;
44 std::function<void(
void*)> del = std::bind(dxtor,
clazz, std::placeholders::_1,
false);
45 dummyptr = std::unique_ptr<void, std::function<void(
void*)> >(
clazz->New(), std::move(del));
62 fMerger.OutputFile(filename,
"RECREATE", compress);
83 const TObjArray* fromBranches = fromTree->GetListOfBranches();
84 const TObjArray* toBranches = toTree->GetListOfBranches();
85 int nBranches = fromBranches->GetEntriesFast();
86 for (
int k = 0; k < nBranches; ++k) {
87 TBranch* branch =
static_cast<TBranch*
>(fromBranches->UncheckedAt(k));
88 if (toBranches->FindObject(branch->GetName()) ==
nullptr) {
89 TBranch* newBranch =
nullptr;
90 TClass* cl = TClass::GetClass(branch->GetClassName());
92 void*
empty = desc.dummyAddr();
93 if (strlen(branch->GetClassName()) > 0) {
94 newBranch = toTree->Branch(branch->GetName(), branch->GetClassName(),
nullptr, branch->GetBasketSize(), branch->GetSplitLevel());
95 newBranch->SetAddress(
empty);
97 TObjArray* outLeaves = branch->GetListOfLeaves();
98 TLeaf* leaf =
static_cast<TLeaf*
>(outLeaves->UncheckedAt(0));
99 std::string
type = leaf->GetTypeName();
100 std::string attr = leaf->GetName();
101 if (
type ==
"Int_t")
type = attr +
"/I";
102 else if (
type ==
"Short_t")
type = attr +
"/S";
103 else if (
type ==
"Long_t")
type = attr +
"/L";
104 else if (
type ==
"UInt_t")
type = attr +
"/i";
105 else if (
type ==
"UShort_t")
type = attr +
"/s";
106 else if (
type ==
"ULong_t")
type = attr +
"/l";
107 else if (
type ==
"Float_t")
type = attr +
"/F";
108 else if (
type ==
"Double_t")
type = attr +
"/D";
109 else if (
type ==
"Char_t")
type = attr +
"/B";
110 else if (
type ==
"UChar_t")
type = attr +
"/b";
111 else if (
type ==
"Bool_t")
type = attr +
"/O";
112 newBranch = toTree->Branch(branch->GetName(),
static_cast<void*
>(
nullptr),
type.c_str(), 2048);
114 int nEntries = toTree->GetEntries();
115 for (
int m = 0; m < nEntries; ++m) {
116 newBranch->BackFill();
127 TIter nextKey(input->GetListOfKeys());
128 while (TKey* key =
static_cast<TKey*
>(nextKey())) {
129 TClass* cl = TClass::GetClass(key->GetClassName());
130 if (cl !=
nullptr && cl->InheritsFrom(
"TTree")) {
131 TTree* outCollTree =
static_cast<TTree*
>(
fMerger.GetOutputFile()->
Get(key->GetName()));
132 TTree* inCollTree =
static_cast<TTree*
>(input->Get(key->GetName()));
133 if (inCollTree !=
nullptr && outCollTree !=
nullptr) {
142 Bool_t
result =
fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable | TFileMerger::kKeepCompression);
143 nextKey = input->GetListOfKeys();
144 while (TKey* key =
static_cast<TKey*
>(nextKey())) {
145 TClass* cl = TClass::GetClass(key->GetClassName());
146 if (cl !=
nullptr && 0 != cl->GetResetAfterMerge()) {
148 input->GetListOfKeys()->Remove(key);
158 : base_class(name, pSvcLocator)
167 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_cnvSvc.get());
168 if (propertyServer ==
nullptr) {
169 ATH_MSG_ERROR(
"Unable to cast conversion service to IProperty");
170 return StatusCode::FAILURE;
172 std::string propertyName =
"ParallelCompression";
173 bool parallelCompression(
false);
174 BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
175 if (propertyServer->getProperty(¶llelCompressionProp).isFailure()) {
176 ATH_MSG_INFO(
"Conversion service does not have ParallelCompression property");
177 }
else if (parallelCompressionProp.value()) {
179 propertyName =
"StreamPortString";
180 std::string streamPortString(
"");
181 StringProperty streamPortStringProp(propertyName, streamPortString);
182 if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
183 ATH_MSG_INFO(
"Conversion service does not have StreamPortString property, using default: " << streamPort);
185 streamPort = atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(
':') + 1).c_str());
187 m_rootServerSocket =
new TServerSocket(streamPort, (streamPort == 0 ?
false :
true), 100, -1, ESocketBindOption::kInaddrLoopback);
189 ATH_MSG_FATAL(
"Could not create ROOT TServerSocket: " << streamPort);
190 return StatusCode::FAILURE;
193 const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(
':')+1) + std::to_string(streamPort)};
194 if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
195 ATH_MSG_FATAL(
"Could not set Conversion Service property " << propertyName <<
" from " << streamPortString <<
" to " << newStreamPortString);
196 return StatusCode::FAILURE;
200 ATH_MSG_DEBUG(
"Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
204 const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
205 for (
const auto& alg : algMgr->getAlgorithms()) {
206 if (alg->type() ==
"AthenaOutputStream") {
207 ATH_MSG_DEBUG(
"Counting " << alg->name() <<
" as an output stream algorithm");
212 ATH_MSG_WARNING(
"No output stream algorithm found, setting the number of streams to 1");
218 return StatusCode::SUCCESS;
223 StatusCode
sc =
m_cnvSvc->commitOutput(
"",
false);
228 while (
sc.isSuccess() ||
sc.isRecoverable() || anyActiveClients) {
229 if (
sc.isSuccess()) {
233 if (socket !=
nullptr && socket != (TSocket*)-1) {
235 if (socket->IsA() == TServerSocket::Class()) {
236 TSocket* client = (
static_cast<TServerSocket*
>(socket))->
Accept();
246 client->Close(
"force");
250 TMessage* message =
nullptr;
251 Int_t
result = socket->Recv(message);
253 ATH_MSG_ERROR(
"ROOT Monitor got an error while receiving the message from the socket: " <<
result);
254 return StatusCode::FAILURE;
256 if (message ==
nullptr) {
257 ATH_MSG_WARNING(
"ROOT Monitor got no message from socket: " << socket);
258 }
else if (message->What() == kMESS_STRING) {
260 message->ReadString(
str, 64);
263 ATH_MSG_DEBUG(
"ROOT Monitor client: " << socket <<
", " << socket->GetBytesRecv() <<
", " << socket->GetBytesSent());
268 anyActiveClients =
false;
269 ATH_MSG_INFO(
"ROOT Monitor: No more active clients...");
271 motherClient =
false;
272 ATH_MSG_INFO(
"ROOT Monitor: Mother process is done...");
273 if (!
m_cnvSvc->commitCatalog().isSuccess()) {
275 return StatusCode::FAILURE;
279 }
else if (message->What() == kMESS_ANY) {
283 message->ReadInt(clientId);
284 message->ReadTString(filename);
285 message->ReadLong64(
length);
286 ATH_MSG_DEBUG(
"ROOT Monitor client: " << socket <<
", " << clientId <<
": " << filename <<
", " <<
length);
287 std::unique_ptr<TMemFile> transient(
new TMemFile(filename, message->Buffer() + message->Length(),
length,
"UPDATE"));
288 message->SetBufferOffset(message->Length() +
length);
293 ATH_MSG_INFO(
"ROOT Monitor ParallelFileMerger: " << info <<
", for: " << filename);
295 info->MergeTrees(transient.get());
297 delete message; message =
nullptr;
304 if (
sc.isSuccess() ||
sc.isRecoverable()) {
306 if (
sc.isFailure() && !
sc.isRecoverable()) {
309 ATH_MSG_INFO(
"ROOT Monitor: No clients, terminating the loop...");
310 anyActiveClients =
false;
316 return StatusCode::SUCCESS;
321 return StatusCode::SUCCESS;
328 return StatusCode::SUCCESS;
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_VERBOSE(x)
#define ATH_MSG_WARNING(x)
This file contains the class definition for the AthenaRootSharedWriterSvc class.
static const Attributes_t empty
Templated class containing a cut, name of cut and description of cut(optional) Typically,...
virtual StatusCode stop() override
virtual StatusCode finalize() override
virtual StatusCode share(int numClients=0, bool motherClient=false) override
virtual StatusCode initialize() override
Gaudi Service Interface method implementations:
AthenaRootSharedWriterSvc(const std::string &name, ISvcLocator *pSvcLocator)
Standard Service Constructor.
TServerSocket * m_rootServerSocket
ServiceHandle< AthenaPoolSharedIOCnvSvc > m_cnvSvc
T * Get(TFile &f, const std::string &n, const std::string &dir="", const chainmap_t *chainmap=0, std::vector< std::string > *saved=0)
get a histogram given a path, and an optional initial directory if histogram is not found,...
Definiton of a branch descriptor from RootTreeContainer.
std::unique_ptr< void, std::function< void(void *)> > dummy_ptr_t
std::unique_ptr< void, std::function< void(void *)> > dummyptr
const char * GetName() const
bool syncBranches(TTree *fromTree, TTree *toTree)
ParallelFileMerger(const char *filename, int compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Bool_t MergeTrees(TFile *input)