10 #include "GaudiKernel/IAlgManager.h"
22 #include "TServerSocket.h"
34 using dummy_ptr_t = std::unique_ptr<void, std::function<void(
void*)> >;
35 std::unique_ptr<void, std::function<void(
void*)> >
dummyptr;
43 void(TClass::*dxtor)(
void*, Bool_t) = &TClass::Destructor;
44 std::function<void(
void*)> del = std::bind(dxtor,
clazz, std::placeholders::_1,
false);
45 dummyptr = std::unique_ptr<void, std::function<void(
void*)> >(
clazz->New(), std::move(del));
83 const TObjArray* fromBranches = fromTree->GetListOfBranches();
84 const TObjArray* toBranches = toTree->GetListOfBranches();
85 int nBranches = fromBranches->GetEntriesFast();
86 for (
int k = 0;
k < nBranches; ++
k) {
87 TBranch*
branch =
static_cast<TBranch*
>(fromBranches->UncheckedAt(
k));
88 if (toBranches->FindObject(
branch->GetName()) ==
nullptr) {
89 TBranch* newBranch =
nullptr;
90 TClass*
cl = TClass::GetClass(
branch->GetClassName());
93 if (strlen(
branch->GetClassName()) > 0) {
94 newBranch = toTree->Branch(
branch->GetName(),
branch->GetClassName(),
nullptr,
branch->GetBasketSize(),
branch->GetSplitLevel());
95 newBranch->SetAddress(
empty);
97 TObjArray* outLeaves =
branch->GetListOfLeaves();
98 TLeaf* leaf =
static_cast<TLeaf*
>(outLeaves->UncheckedAt(0));
99 std::string
type = leaf->GetTypeName();
100 std::string attr = leaf->GetName();
101 if (
type ==
"Int_t")
type = attr +
"/I";
102 else if (
type ==
"Short_t")
type = attr +
"/S";
103 else if (
type ==
"Long_t")
type = attr +
"/L";
104 else if (
type ==
"UInt_t")
type = attr +
"/i";
105 else if (
type ==
"UShort_t")
type = attr +
"/s";
106 else if (
type ==
"ULong_t")
type = attr +
"/l";
107 else if (
type ==
"Float_t")
type = attr +
"/F";
108 else if (
type ==
"Double_t")
type = attr +
"/D";
109 else if (
type ==
"Char_t")
type = attr +
"/B";
110 else if (
type ==
"UChar_t")
type = attr +
"/b";
111 else if (
type ==
"Bool_t")
type = attr +
"/O";
112 newBranch = toTree->Branch(
branch->GetName(),
static_cast<void*
>(
nullptr),
type.c_str(), 2048);
114 int nEntries = toTree->GetEntries();
116 newBranch->BackFill();
127 TIter nextKey(
input->GetListOfKeys());
128 while (TKey*
key =
static_cast<TKey*
>(nextKey())) {
129 TClass*
cl = TClass::GetClass(
key->GetClassName());
130 if (
cl !=
nullptr &&
cl->InheritsFrom(
"TTree")) {
131 TTree* outCollTree =
static_cast<TTree*
>(
fMerger.GetOutputFile()->Get(
key->GetName()));
132 TTree* inCollTree =
static_cast<TTree*
>(
input->Get(
key->GetName()));
133 if (inCollTree !=
nullptr && outCollTree !=
nullptr) {
142 Bool_t
result =
fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable | TFileMerger::kKeepCompression);
143 nextKey =
input->GetListOfKeys();
144 while (TKey*
key =
static_cast<TKey*
>(nextKey())) {
145 TClass*
cl = TClass::GetClass(
key->GetClassName());
146 if (
cl !=
nullptr && 0 !=
cl->GetResetAfterMerge()) {
148 input->GetListOfKeys()->Remove(
key);
158 : base_class(
name, pSvcLocator)
159 , m_rootServerSocket(nullptr), m_rootMonitor(nullptr), m_rootMergers(), m_rootClientIndex(0), m_rootClientCount(0), m_numberOfStreams(0) {
167 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_cnvSvc.get());
168 if (propertyServer ==
nullptr) {
169 ATH_MSG_ERROR(
"Unable to cast conversion service to IProperty");
170 return StatusCode::FAILURE;
172 std::string propertyName =
"ParallelCompression";
173 bool parallelCompression(
false);
174 BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
175 if (propertyServer->getProperty(¶llelCompressionProp).isFailure()) {
176 ATH_MSG_INFO(
"Conversion service does not have ParallelCompression property");
177 }
else if (parallelCompressionProp.value()) {
179 propertyName =
"StreamPortString";
180 std::string streamPortString(
"");
181 StringProperty streamPortStringProp(propertyName, streamPortString);
182 if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
183 ATH_MSG_INFO(
"Conversion service does not have StreamPortString property, using default: " << streamPort);
185 streamPort =
atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(
':') + 1).c_str());
187 m_rootServerSocket =
new TServerSocket(streamPort, (streamPort == 0 ?
false :
true), 100);
189 ATH_MSG_FATAL(
"Could not create ROOT TServerSocket: " << streamPort);
190 return StatusCode::FAILURE;
193 const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(
':')+1) +
std::to_string(streamPort)};
194 if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
195 ATH_MSG_FATAL(
"Could not set Conversion Service property " << propertyName <<
" from " << streamPortString <<
" to " << newStreamPortString);
196 return StatusCode::FAILURE;
200 ATH_MSG_DEBUG(
"Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
204 const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
205 for (
const auto&
alg : algMgr->getAlgorithms()) {
206 if (
alg->type() ==
"AthenaOutputStream") {
207 ATH_MSG_DEBUG(
"Counting " <<
alg->name() <<
" as an output stream algorithm");
212 ATH_MSG_WARNING(
"No output stream algorithm found, setting the number of streams to 1");
218 return StatusCode::SUCCESS;
228 while (
sc.isSuccess() ||
sc.isRecoverable() || anyActiveClients) {
229 if (
sc.isSuccess()) {
233 if (socket !=
nullptr && socket != (TSocket*)-1) {
235 if (socket->IsA() == TServerSocket::Class()) {
236 TSocket*
client = ((TServerSocket*)socket)->Accept();
253 ATH_MSG_ERROR(
"ROOT Monitor got an error while receiving the message from the socket: " <<
result);
254 return StatusCode::FAILURE;
257 ATH_MSG_WARNING(
"ROOT Monitor got no message from socket: " << socket);
258 }
else if (
message->What() == kMESS_STRING) {
263 ATH_MSG_DEBUG(
"ROOT Monitor client: " << socket <<
", " << socket->GetBytesRecv() <<
", " << socket->GetBytesSent());
268 anyActiveClients =
false;
269 ATH_MSG_INFO(
"ROOT Monitor: No more active clients...");
271 motherClient =
false;
272 ATH_MSG_INFO(
"ROOT Monitor: Mother process is done...");
273 if (!
m_cnvSvc->commitCatalog().isSuccess()) {
275 return StatusCode::FAILURE;
279 }
else if (
message->What() == kMESS_ANY) {
295 info->MergeTrees(
transient.
get());
304 if (
sc.isSuccess() ||
sc.isRecoverable()) {
306 if (
sc.isFailure() && !
sc.isRecoverable()) {
309 ATH_MSG_INFO(
"ROOT Monitor: No clients, terminating the loop...");
310 anyActiveClients =
false;
316 return StatusCode::SUCCESS;
321 return StatusCode::SUCCESS;
328 return StatusCode::SUCCESS;