10 #include "GaudiKernel/IAlgManager.h"
22 #include "TServerSocket.h"
34 using dummy_ptr_t = std::unique_ptr<void, std::function<void(
void*)> >;
35 std::unique_ptr<void, std::function<void(
void*)> >
dummyptr;
43 void(TClass::*dxtor)(
void*, Bool_t) = &TClass::Destructor;
44 std::function<void(
void*)> del = std::bind(dxtor,
clazz, std::placeholders::_1,
false);
45 dummyptr = std::unique_ptr<void, std::function<void(
void*)> >(
clazz->New(), std::move(del));
83 const TObjArray* fromBranches = fromTree->GetListOfBranches();
84 const TObjArray* toBranches = toTree->GetListOfBranches();
85 int nBranches = fromBranches->GetEntriesFast();
86 for (
int k = 0;
k < nBranches; ++
k) {
87 TBranch*
branch =
static_cast<TBranch*
>(fromBranches->UncheckedAt(
k));
88 if (toBranches->FindObject(
branch->GetName()) ==
nullptr) {
89 TBranch* newBranch =
nullptr;
90 TClass*
cl = TClass::GetClass(
branch->GetClassName());
94 if (strlen(
branch->GetClassName()) > 0) {
95 newBranch = toTree->Branch(
branch->GetName(),
branch->GetClassName(),
nullptr,
branch->GetBasketSize(),
branch->GetSplitLevel());
96 newBranch->SetAddress(
empty);
98 TObjArray* outLeaves =
branch->GetListOfLeaves();
99 TLeaf* leaf =
static_cast<TLeaf*
>(outLeaves->UncheckedAt(0));
100 std::string
type = leaf->GetTypeName();
101 std::string attr = leaf->GetName();
102 if (
type ==
"Int_t")
type = attr +
"/I";
103 else if (
type ==
"Short_t")
type = attr +
"/S";
104 else if (
type ==
"Long_t")
type = attr +
"/L";
105 else if (
type ==
"UInt_t")
type = attr +
"/i";
106 else if (
type ==
"UShort_t")
type = attr +
"/s";
107 else if (
type ==
"UShort_t")
type = attr +
"/s";
108 else if (
type ==
"Float_t")
type = attr +
"/F";
109 else if (
type ==
"Double_t")
type = attr +
"/D";
110 else if (
type ==
"Char_t")
type = attr +
"/B";
111 else if (
type ==
"UChar_t")
type = attr +
"/b";
112 else if (
type ==
"Bool_t")
type = attr +
"/O";
113 newBranch = toTree->Branch(
branch->GetName(),
buff,
type.c_str(), 2048);
115 int nEntries = toTree->GetEntries();
117 newBranch->BackFill();
128 TIter nextKey(
input->GetListOfKeys());
129 while (TKey*
key =
static_cast<TKey*
>(nextKey())) {
130 TClass*
cl = TClass::GetClass(
key->GetClassName());
131 if (
cl !=
nullptr &&
cl->InheritsFrom(
"TTree")) {
132 TTree* outCollTree =
static_cast<TTree*
>(
fMerger.GetOutputFile()->Get(
key->GetName()));
133 TTree* inCollTree =
static_cast<TTree*
>(
input->Get(
key->GetName()));
134 if (inCollTree !=
nullptr && outCollTree !=
nullptr) {
143 Bool_t
result =
fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable | TFileMerger::kKeepCompression);
144 nextKey =
input->GetListOfKeys();
145 while (TKey*
key =
static_cast<TKey*
>(nextKey())) {
146 TClass*
cl = TClass::GetClass(
key->GetClassName());
147 if (
cl !=
nullptr && 0 !=
cl->GetResetAfterMerge()) {
149 input->GetListOfKeys()->Remove(
key);
159 : base_class(
name, pSvcLocator)
160 , m_rootServerSocket(nullptr), m_rootMonitor(nullptr), m_rootMergers(), m_rootClientIndex(0), m_rootClientCount(0), m_numberOfStreams(0) {
168 IProperty* propertyServer =
dynamic_cast<IProperty*
>(
m_cnvSvc.get());
169 if (propertyServer ==
nullptr) {
170 ATH_MSG_ERROR(
"Unable to cast conversion service to IProperty");
171 return StatusCode::FAILURE;
173 std::string propertyName =
"ParallelCompression";
174 bool parallelCompression(
false);
175 BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
176 if (propertyServer->getProperty(¶llelCompressionProp).isFailure()) {
177 ATH_MSG_INFO(
"Conversion service does not have ParallelCompression property");
178 }
else if (parallelCompressionProp.value()) {
180 propertyName =
"StreamPortString";
181 std::string streamPortString(
"");
182 StringProperty streamPortStringProp(propertyName, streamPortString);
183 if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
184 ATH_MSG_INFO(
"Conversion service does not have StreamPortString property, using default: " << streamPort);
186 streamPort =
atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(
':') + 1).c_str());
188 m_rootServerSocket =
new TServerSocket(streamPort, (streamPort == 0 ?
false :
true), 100);
190 ATH_MSG_FATAL(
"Could not create ROOT TServerSocket: " << streamPort);
191 return StatusCode::FAILURE;
194 const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(
':')+1) +
std::to_string(streamPort)};
195 if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
196 ATH_MSG_FATAL(
"Could not set Conversion Service property " << propertyName <<
" from " << streamPortString <<
" to " << newStreamPortString);
197 return StatusCode::FAILURE;
201 ATH_MSG_DEBUG(
"Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
205 const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
206 for (
const auto&
alg : algMgr->getAlgorithms()) {
207 if (
alg->type() ==
"AthenaOutputStream") {
208 ATH_MSG_DEBUG(
"Counting " <<
alg->name() <<
" as an output stream algorithm");
213 ATH_MSG_WARNING(
"No output stream algorithm found, setting the number of streams to 1");
219 return StatusCode::SUCCESS;
229 while (
sc.isSuccess() ||
sc.isRecoverable() || anyActiveClients) {
230 if (
sc.isSuccess()) {
234 if (socket !=
nullptr && socket != (TSocket*)-1) {
236 if (socket->IsA() == TServerSocket::Class()) {
237 TSocket*
client = ((TServerSocket*)socket)->Accept();
254 ATH_MSG_ERROR(
"ROOT Monitor got an error while receiving the message from the socket: " <<
result);
255 return StatusCode::FAILURE;
258 ATH_MSG_WARNING(
"ROOT Monitor got no message from socket: " << socket);
259 }
else if (
message->What() == kMESS_STRING) {
264 ATH_MSG_DEBUG(
"ROOT Monitor client: " << socket <<
", " << socket->GetBytesRecv() <<
", " << socket->GetBytesSent());
269 anyActiveClients =
false;
270 ATH_MSG_INFO(
"ROOT Monitor: No more active clients...");
272 motherClient =
false;
273 ATH_MSG_INFO(
"ROOT Monitor: Mother process is done...");
274 if (!
m_cnvSvc->commitCatalog().isSuccess()) {
276 return StatusCode::FAILURE;
280 }
else if (
message->What() == kMESS_ANY) {
296 info->MergeTrees(
transient.
get());
305 if (
sc.isSuccess() ||
sc.isRecoverable()) {
307 if (
sc.isFailure() && !
sc.isRecoverable()) {
310 ATH_MSG_INFO(
"ROOT Monitor: No clients, terminating the loop...");
311 anyActiveClients =
false;
317 return StatusCode::SUCCESS;
322 return StatusCode::SUCCESS;
329 return StatusCode::SUCCESS;