ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaRootSharedWriterSvc Class Reference

This class provides an example for writing event data objects to Pool. More...

#include <AthenaRootSharedWriterSvc.h>

Inheritance diagram for AthenaRootSharedWriterSvc:
Collaboration diagram for AthenaRootSharedWriterSvc:

Public Member Functions

 AthenaRootSharedWriterSvc (const std::string &name, ISvcLocator *pSvcLocator)
 Standard Service Constructor.
virtual ~AthenaRootSharedWriterSvc ()=default
 Destructor.
virtual StatusCode initialize () override
 Gaudi Service Interface method implementations:
virtual StatusCode stop () override
virtual StatusCode finalize () override
virtual StatusCode share (int numClients=0, bool motherClient=false) override

Private Attributes

ServiceHandle< AthenaPoolSharedIOCnvSvcm_cnvSvc {this,"AthenaPoolSharedIOCnvSvc","AthenaPoolSharedIOCnvSvc"}
TServerSocket * m_rootServerSocket
TMonitor * m_rootMonitor
THashTable m_rootMergers
int m_rootClientIndex
int m_rootClientCount
int m_numberOfStreams

Friends

class SvcFactory< AthenaRootSharedWriterSvc >

Detailed Description

This class provides an example for writing event data objects to Pool.

Definition at line 25 of file AthenaRootSharedWriterSvc.h.

Constructor & Destructor Documentation

◆ AthenaRootSharedWriterSvc()

AthenaRootSharedWriterSvc::AthenaRootSharedWriterSvc ( const std::string & name,
ISvcLocator * pSvcLocator )

◆ ~AthenaRootSharedWriterSvc()

virtual AthenaRootSharedWriterSvc::~AthenaRootSharedWriterSvc ( )
virtualdefault

Destructor.

Member Function Documentation

◆ finalize()

StatusCode AthenaRootSharedWriterSvc::finalize ( )
overridevirtual

Definition at line 324 of file AthenaRootSharedWriterSvc.cxx.

324 {
325 ATH_MSG_INFO("in finalize()");
326 delete m_rootMonitor; m_rootMonitor = nullptr;
327 delete m_rootServerSocket; m_rootServerSocket = nullptr;
328 return StatusCode::SUCCESS;
329}
#define ATH_MSG_INFO(x)

◆ initialize()

StatusCode AthenaRootSharedWriterSvc::initialize ( )
overridevirtual

Gaudi Service Interface method implementations:

Definition at line 162 of file AthenaRootSharedWriterSvc.cxx.

162 {
163 ATH_MSG_INFO("in initialize()");
164
165 // Initialize IConversionSvc
166 ATH_CHECK(m_cnvSvc.retrieve());
167 IProperty* propertyServer = dynamic_cast<IProperty*>(m_cnvSvc.get());
168 if (propertyServer == nullptr) {
169 ATH_MSG_ERROR("Unable to cast conversion service to IProperty");
170 return StatusCode::FAILURE;
171 } else {
172 std::string propertyName = "ParallelCompression";
173 bool parallelCompression(false);
174 BooleanProperty parallelCompressionProp(propertyName, parallelCompression);
175 if (propertyServer->getProperty(&parallelCompressionProp).isFailure()) {
176 ATH_MSG_INFO("Conversion service does not have ParallelCompression property");
177 } else if (parallelCompressionProp.value()) {
178 int streamPort = 0;
179 propertyName = "StreamPortString";
180 std::string streamPortString("");
181 StringProperty streamPortStringProp(propertyName, streamPortString);
182 if (propertyServer->getProperty(&streamPortStringProp).isFailure()) {
183 ATH_MSG_INFO("Conversion service does not have StreamPortString property, using default: " << streamPort);
184 } else {
185 streamPort = atoi(streamPortStringProp.value().substr(streamPortStringProp.value().find(':') + 1).c_str());
186 }
187 m_rootServerSocket = new TServerSocket(streamPort, (streamPort == 0 ? false : true), 100, -1, ESocketBindOption::kInaddrLoopback);
188 if (m_rootServerSocket == nullptr || !m_rootServerSocket->IsValid()) {
189 ATH_MSG_FATAL("Could not create ROOT TServerSocket: " << streamPort);
190 return StatusCode::FAILURE;
191 }
192 streamPort = m_rootServerSocket->GetLocalPort();
193 const std::string newStreamPortString{streamPortStringProp.value().substr(0,streamPortStringProp.value().find(':')+1) + std::to_string(streamPort)};
194 if (propertyServer->setProperty(propertyName,newStreamPortString).isFailure()) {
195 ATH_MSG_FATAL("Could not set Conversion Service property " << propertyName << " from " << streamPortString << " to " << newStreamPortString);
196 return StatusCode::FAILURE;
197 }
198 m_rootMonitor = new TMonitor;
200 ATH_MSG_DEBUG("Successfully created ROOT TServerSocket and added it to TMonitor: ready to accept connections, " << streamPort);
201 }
202 }
203 // Count the number of output streams
204 const IAlgManager* algMgr = Gaudi::svcLocator()->as<IAlgManager>();
205 for (const auto& alg : algMgr->getAlgorithms()) {
206 if (alg->type() == "AthenaOutputStream") {
207 ATH_MSG_DEBUG("Counting " << alg->name() << " as an output stream algorithm");
209 }
210 }
211 if (m_numberOfStreams == 0) {
212 ATH_MSG_WARNING("No output stream algorithm found, setting the number of streams to 1");
214 } else {
215 ATH_MSG_INFO("Found a total of " << m_numberOfStreams << " output streams");
216 }
217
218 return StatusCode::SUCCESS;
219}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_WARNING(x)
#define ATH_MSG_DEBUG(x)
ServiceHandle< AthenaPoolSharedIOCnvSvc > m_cnvSvc
int atoi(std::string_view str)
Helper functions to unpack numbers decoded in string into integers and doubles The strings are requir...

◆ share()

StatusCode AthenaRootSharedWriterSvc::share ( int numClients = 0,
bool motherClient = false )
overridevirtual

Definition at line 221 of file AthenaRootSharedWriterSvc.cxx.

221 {
222 ATH_MSG_DEBUG("Start commitOutput loop");
223 StatusCode sc = m_cnvSvc->commitOutput("", false);
224
225 // Allow ROOT clients to start up (by setting active clients)
226 // and wait to stop the ROOT server until all clients are done and metadata is written (commitOutput fail).
227 bool anyActiveClients = (m_rootServerSocket != nullptr);
228 while (sc.isSuccess() || sc.isRecoverable() || anyActiveClients) {
229 if (sc.isSuccess()) {
230 ATH_MSG_VERBOSE("Success in commitOutput loop");
231 } else if (m_rootMonitor != nullptr) {
232 TSocket* socket = m_rootMonitor->Select(1);
233 if (socket != nullptr && socket != (TSocket*)-1) {
234 ATH_MSG_DEBUG("ROOT Monitor got: " << socket);
235 if (socket->IsA() == TServerSocket::Class()) {
236 TSocket* client = (static_cast<TServerSocket*>(socket))->Accept();
237 client->Send(m_rootClientIndex, 0);
238 client->Send(1, 1);
241 if (m_rootClientCount < (numClients-1)*m_numberOfStreams + 1) {
242 m_rootMonitor->Add(client);
243 ATH_MSG_INFO("ROOT Monitor add client: " << m_rootClientIndex << ", " << client);
244 } else {
245 ATH_MSG_WARNING("ROOT Monitor do NOT add client: " << m_rootClientIndex << ", " << client);
246 client->Close("force");
248 }
249 } else {
250 TMessage* message = nullptr;
251 Int_t result = socket->Recv(message);
252 if (result < 0) {
253 ATH_MSG_ERROR("ROOT Monitor got an error while receiving the message from the socket: " << result);
254 return StatusCode::FAILURE;
255 }
256 if (message == nullptr) {
257 ATH_MSG_WARNING("ROOT Monitor got no message from socket: " << socket);
258 } else if (message->What() == kMESS_STRING) {
259 char str[64];
260 message->ReadString(str, 64);
261 ATH_MSG_INFO("ROOT Monitor client: " << socket << ", " << str);
262 m_rootMonitor->Remove(socket);
263 ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << socket->GetBytesRecv() << ", " << socket->GetBytesSent());
264 socket->Close();
266 if (m_rootMonitor->GetActive() == 0 || m_rootClientCount == 0) {
267 if (!motherClient) {
268 anyActiveClients = false;
269 ATH_MSG_INFO("ROOT Monitor: No more active clients...");
270 } else {
271 motherClient = false;
272 ATH_MSG_INFO("ROOT Monitor: Mother process is done...");
273 if (!m_cnvSvc->commitCatalog().isSuccess()) {
274 ATH_MSG_FATAL("Failed to commit file catalog.");
275 return StatusCode::FAILURE;
276 }
277 }
278 }
279 } else if (message->What() == kMESS_ANY) {
280 long long length;
281 TString filename;
282 int clientId;
283 message->ReadInt(clientId);
284 message->ReadTString(filename);
285 message->ReadLong64(length);
286 ATH_MSG_DEBUG("ROOT Monitor client: " << socket << ", " << clientId << ": " << filename << ", " << length);
287 std::unique_ptr<TMemFile> transient(new TMemFile(filename, message->Buffer() + message->Length(), length, "UPDATE"));
288 message->SetBufferOffset(message->Length() + length);
289 ParallelFileMerger* info = static_cast<ParallelFileMerger*>(m_rootMergers.FindObject(filename));
290 if (!info) {
291 info = new ParallelFileMerger(filename, transient->GetCompressionSettings());
292 m_rootMergers.Add(info);
293 ATH_MSG_INFO("ROOT Monitor ParallelFileMerger: " << info << ", for: " << filename);
294 }
295 info->MergeTrees(transient.get());
296 }
297 delete message; message = nullptr;
298 }
299 }
300 } else if (m_rootMonitor == nullptr) {
301 usleep(100);
302 }
303 // Once commitOutput failed all legacy clients are finished (writing metadata), do not call again.
304 if (sc.isSuccess() || sc.isRecoverable()) {
305 sc = m_cnvSvc->commitOutput("", false);
306 if (sc.isFailure() && !sc.isRecoverable()) {
307 ATH_MSG_INFO("commitOutput failed, metadata done.");
308 if (anyActiveClients && m_rootClientCount == 0) {
309 ATH_MSG_INFO("ROOT Monitor: No clients, terminating the loop...");
310 anyActiveClients = false;
311 }
312 }
313 }
314 }
315 ATH_MSG_INFO("End commitOutput loop");
316 return StatusCode::SUCCESS;
317}
#define ATH_MSG_VERBOSE(x)
double length(const pvec &v)
static Double_t sc
::StatusCode StatusCode
StatusCode definition for legacy code.

◆ stop()

StatusCode AthenaRootSharedWriterSvc::stop ( )
overridevirtual

Definition at line 319 of file AthenaRootSharedWriterSvc.cxx.

319 {
320 m_rootMergers.Delete();
321 return StatusCode::SUCCESS;
322}

◆ SvcFactory< AthenaRootSharedWriterSvc >

friend class SvcFactory< AthenaRootSharedWriterSvc >
friend

Definition at line 1 of file AthenaRootSharedWriterSvc.h.

Member Data Documentation

◆ m_cnvSvc

ServiceHandle<AthenaPoolSharedIOCnvSvc> AthenaRootSharedWriterSvc::m_cnvSvc {this,"AthenaPoolSharedIOCnvSvc","AthenaPoolSharedIOCnvSvc"}
private

Definition at line 44 of file AthenaRootSharedWriterSvc.h.

44{this,"AthenaPoolSharedIOCnvSvc","AthenaPoolSharedIOCnvSvc"};

◆ m_numberOfStreams

int AthenaRootSharedWriterSvc::m_numberOfStreams
private

Definition at line 51 of file AthenaRootSharedWriterSvc.h.

◆ m_rootClientCount

int AthenaRootSharedWriterSvc::m_rootClientCount
private

Definition at line 50 of file AthenaRootSharedWriterSvc.h.

◆ m_rootClientIndex

int AthenaRootSharedWriterSvc::m_rootClientIndex
private

Definition at line 49 of file AthenaRootSharedWriterSvc.h.

◆ m_rootMergers

THashTable AthenaRootSharedWriterSvc::m_rootMergers
private

Definition at line 48 of file AthenaRootSharedWriterSvc.h.

◆ m_rootMonitor

TMonitor* AthenaRootSharedWriterSvc::m_rootMonitor
private

Definition at line 47 of file AthenaRootSharedWriterSvc.h.

◆ m_rootServerSocket

TServerSocket* AthenaRootSharedWriterSvc::m_rootServerSocket
private

Definition at line 46 of file AthenaRootSharedWriterSvc.h.


The documentation for this class was generated from the following files: