ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaHDFStreamTool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
4
5/* file contains the implementation for the AthenaHDFStreamTool class.
6 * @author Peter van Gemmeren <gemmeren@anl.gov>
7 **/
8
10
11#include "GaudiKernel/FileIncident.h"
12
13#include "StorageSvc/DbReflex.h"
15
16#include "H5Cpp.h"
17#include "H5File.h"
18#include "H5Group.h"
19
20static const char* const fmt_oid = "[OID=%08lX%08lX-%016llX]";
21static const char* const fmt_aux = "[AUX=%08lX]";
22
23namespace{
24 void
25 stringBefore(std::string & s, char sc){
26 auto n = s.find(sc);
27 if (n!=std::string::npos) s.resize(n);
28 return;
29 }
30}
31
32//___________________________________________________________________________
34 const std::string& name,
35 const IInterface* parent) : base_class(type, name, parent),
36 m_file(nullptr),
37 m_group(nullptr),
38 m_token(""),
39 m_read_data(nullptr),
40 m_read_size(0),
42 m_event_iter(0),
43 m_isClient(false),
44 m_incidentSvc("IncidentSvc", name) {
45}
46
47//___________________________________________________________________________
50
51//___________________________________________________________________________
53 if (!::AthAlgTool::initialize().isSuccess()) {
54 ATH_MSG_FATAL("Cannot initialize AthAlgTool base class.");
55 return(StatusCode::FAILURE);
56 }
57 // Retrieve IncidentSvc
58 if (!m_incidentSvc.retrieve().isSuccess()) {
59 ATH_MSG_FATAL("Cannot get IncidentSvc");
60 return(StatusCode::FAILURE);
61 }
62 return(StatusCode::SUCCESS);
63}
64
65//___________________________________________________________________________
67 ATH_MSG_INFO("in finalize()");
68 return(::AthAlgTool::finalize());
69}
70
71//___________________________________________________________________________
72StatusCode AthenaHDFStreamTool::makeServer(int/* num*/, const std::string& /*streamPortSuffix*/) {
73 ATH_MSG_ERROR("AthenaHDFStreamTool::makeServer");
74 return(StatusCode::FAILURE);
75}
76
77//___________________________________________________________________________
79 return(false);
80}
81
82//___________________________________________________________________________
83StatusCode AthenaHDFStreamTool::makeClient(int num, std::string& /*streamPortSuffix*/) {
84 ATH_MSG_INFO("AthenaHDFStreamTool::makeClient: " << num);
85
86 if (num > 0) {
87 m_file = new H5::H5File( "test.h5", H5F_ACC_TRUNC ); //FIXME, hardcoded filename
88 m_group = new H5::Group(m_file->createGroup("data"));
89 } else {
90 m_file = new H5::H5File( "test.h5", H5F_ACC_RDONLY );
91 m_group = new H5::Group(m_file->openGroup("data"));
92 }
93 m_isClient = true;
94 return(StatusCode::SUCCESS);
95}
96
97//___________________________________________________________________________
99 return(m_isClient);
100}
101
102//___________________________________________________________________________
103StatusCode AthenaHDFStreamTool::putEvent(long/* eventNumber*/, const void* /* source*/, std::size_t/* nbytes*/, unsigned int/* status*/) const {
104 ATH_MSG_ERROR("AthenaHDFStreamTool::putEvent");
105 return(StatusCode::FAILURE);
106}
107
108//___________________________________________________________________________
109StatusCode AthenaHDFStreamTool::getLockedEvent(void** target, unsigned int&/* status*/) const {
110 ATH_MSG_INFO("AthenaHDFStreamTool::getLockedEvent");
111 const std::string dh_entry = "POOLContainer(DataHeader)_entry";
112 H5::DataSet dataset = m_group->openDataSet(dh_entry);
113 if (m_event_iter + 1 >= dataset.getInMemDataSize()/8) { // End of File
114 m_inputFileGuard.reset(); // fires EndInputFile
115 ATH_MSG_INFO("AthenaHDFStreamTool::getLockedEvent: no more events = " << m_event_iter);
116 return(StatusCode::RECOVERABLE);
117 }
118
119 const hsize_t offset[1] = {m_event_iter};
120 H5::DataSpace filespace = dataset.getSpace();
121 const hsize_t mem_size[1] = {2};
122 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
123 H5::DataSpace memspace(1, mem_size);
124 long long unsigned int ds_data[2] = {0, 0};
125 dataset.read(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
126 std::size_t nbytes = ds_data[1] - ds_data[0];
127 m_token = "[DB=00000000-0000-0000-0000-000000000000][CNT=POOLContainer(DataHeader)][CLID=4DDBD295-EFCE-472A-9EC8-15CD35A9EB8D][TECH=00000401]";
128 char text[64];
129 sprintf(text, fmt_oid, 0ul, nbytes, m_event_iter);
130 text[40] = 0;
131 m_token += text;
132
133 const int length = m_token.size() + 2; //FIXME: copy token
134 *target = new char[length];
135 std::memcpy(static_cast<char*>(*target), m_token.c_str(), length - 1);
136 target[length - 1] = 0;
137 return(StatusCode::SUCCESS);
138}
139
140//___________________________________________________________________________
141StatusCode AthenaHDFStreamTool::lockEvent(long eventNumber) const {
142 ATH_MSG_VERBOSE("AthenaHDFStreamTool::lockEvent: " << eventNumber);
143 m_event_iter = eventNumber;
144 if (eventNumber == 0) {
146 "HDF:test.h5", {}); //FIXME, hardcoded filename
147 }
148 return(StatusCode::SUCCESS);
149}
150
151//___________________________________________________________________________
152StatusCode AthenaHDFStreamTool::putObject(const void* source, std::size_t nbytes, int/* num*/) {
153 if (nbytes == 0 || m_token.empty()) {
154 return(StatusCode::SUCCESS);
155 }
156 ATH_MSG_INFO("AthenaHDFStreamTool::putObject: source = " << source << ", nbytes = " << nbytes);
157
158 if (m_token.find("[CONT=") != std::string::npos) m_token.replace(m_token.find("[CONT="), 6, "[CNT=");
159 std::string ds_name = m_token.substr(m_token.find("[CNT=") + 5);
160 stringBefore(ds_name,']');
161 while (ds_name.find("/") != std::string::npos) { ds_name = ds_name.replace(ds_name.find("/"), 1, "_"); }
162
163 m_token.replace(m_token.find("[TECH="), 15, "[TECH=00000401]");
164 std::string className = m_token.substr(m_token.find("[PNAME=") + 7);
165 stringBefore(className, ']');
166
167 long long unsigned int positionCount = 0;
168 if (m_token.find("[CLID=") == std::string::npos) { // Core object
169 m_token += "[CLID=" + pool::DbReflex::guid(RootType::ByNameNoQuiet(className)).toString() + "]";
170 } else { // Aux Store extension
171 char text[64];
172 sprintf(text, fmt_aux, nbytes);
173 text[15] = 0;
174 H5::DataSet dataset = m_group->openDataSet(ds_name);
175 const hsize_t offset[1] = {dataset.getInMemDataSize()};
176 positionCount = offset[0];
177 const hsize_t ds_size[1] = {offset[0] + 15};
178 dataset.extend(ds_size);
179 H5::DataSpace filespace = dataset.getSpace();
180 const hsize_t mem_size[1] = {15};
181 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
182 H5::DataSpace memspace(1, mem_size);
183 dataset.write(text, H5::PredType::NATIVE_CHAR, memspace, filespace);
184 }
185
186// Write Payload data
187 if (!m_group->exists(ds_name)) { //if dataset doesn't exist, create it otherwise extend it
188 const hsize_t maxdim[1] = {H5S_UNLIMITED};
189 const hsize_t ds_size[1] = {nbytes};
190 H5::DataSpace filespace(1, ds_size, maxdim);
191 H5::DSetCreatPropList ds_prop;
192 hsize_t chunkdim[1] = {nbytes};
193 if( ds_name.starts_with(APRDefaults::WriteConfig::getEventDataName()) ||
194 ds_name.starts_with(APRDefaults::WriteConfig::getEventTagName()) ) {
195 if (nbytes < 512) {
196 chunkdim[0] = 4096;
197 } else if (nbytes < 16 * 512) {
198 chunkdim[0] = 4 * 4096;
199 } else {
200 chunkdim[0] = (hsize_t(nbytes / 4096) + 1) * 4096;
201 }
202 }
203 ds_prop.setChunk(1, chunkdim);
204 char fill_val[1] = {0};
205 ds_prop.setFillValue(H5::PredType::NATIVE_CHAR, fill_val);
206 H5::DataSet dataset = m_group->createDataSet(ds_name, H5::PredType::NATIVE_CHAR, filespace, ds_prop);
207 dataset.write(source, H5::PredType::NATIVE_CHAR);
208 } else {
209 H5::DataSet dataset = m_group->openDataSet(ds_name);
210 const hsize_t offset[1] = {dataset.getInMemDataSize()};
211 positionCount = offset[0];
212 const hsize_t ds_size[1] = {offset[0] + nbytes};
213 dataset.extend(ds_size);
214 H5::DataSpace filespace = dataset.getSpace();
215 const hsize_t mem_size[1] = {nbytes};
216 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
217 H5::DataSpace memspace(1, mem_size);
218 dataset.write(source, H5::PredType::NATIVE_CHAR, memspace, filespace);
219 }
220 if (m_token.find("[OID=") == std::string::npos) { // Core object
221 char text[64];
222 sprintf(text, fmt_oid, 0ul, nbytes, positionCount);
223 text[40] = 0;
224 m_token += text;
225 } else {
226 char text[64];
227 std::size_t firstU, firstL;
228 long long unsigned int second;
229 ::sscanf(m_token.substr(m_token.find("[OID="), 40).c_str(), fmt_oid, &firstU, &firstL, &second);
230 if (firstU == 0ul) { //FIXME1
231 firstU = firstL; // Keep Core object size
232 }
233 firstL = positionCount + nbytes - second;
234 sprintf(text, fmt_oid, firstU, firstL, second); // FIXME
235 text[40] = 0;
236 m_token.replace(m_token.find("[OID="), 39, text);
237 }
238
239 std::string entry_name = ds_name.substr(ds_name.find('(') + 1);
240 stringBefore(entry_name,')');
241// For DataHeader, store entry point
242 if (entry_name == "DataHeader" || entry_name == "DataHeaderForm") {
243 auto dh_entry = ds_name + "_entry";
244 if (!m_group->exists(dh_entry)) {
245 const hsize_t maxdim[1] = {H5S_UNLIMITED};
246 const hsize_t ds_size[1] = {2};
247 H5::DataSpace filespace(1, ds_size, maxdim);
248 H5::DSetCreatPropList ds_prop;
249 const hsize_t chunkdim[1] = {512};
250 ds_prop.setChunk(1, chunkdim);
251 char fill_val[1] = {0};
252 ds_prop.setFillValue(H5::PredType::NATIVE_ULLONG, fill_val);
253 H5::DataSet dataset = m_group->createDataSet(dh_entry, H5::PredType::NATIVE_ULLONG, filespace, ds_prop);
254 long long unsigned int ds_data[2] = {positionCount, positionCount + nbytes};
255 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG);
256 } else {
257 H5::DataSet dataset = m_group->openDataSet(dh_entry);
258 const hsize_t offset[1] = {dataset.getInMemDataSize()/8};//FIXME
259 const hsize_t ds_size[1] = {offset[0] + 1};
260 dataset.extend(ds_size);
261 H5::DataSpace filespace = dataset.getSpace();
262 const hsize_t mem_size[1] = {1};
263 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
264 H5::DataSpace memspace(1, mem_size);
265 long long unsigned int ds_data[1] = {positionCount + nbytes};
266 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
267 }
268 if (entry_name == "DataHeader") {
269 auto dh_form_entry = ds_name + "_form_entry";
270 if (!m_group->exists(dh_form_entry)) {
271 const hsize_t maxdim[1] = {H5S_UNLIMITED};
272 const hsize_t ds_size[1] = {1};
273 H5::DataSpace filespace(1, ds_size, maxdim);
274 H5::DSetCreatPropList ds_prop;
275 const hsize_t chunkdim[1] = {512};
276 ds_prop.setChunk(1, chunkdim);
277 char fill_val[1] = {0};
278 ds_prop.setFillValue(H5::PredType::NATIVE_ULLONG, fill_val);
279 H5::DataSet dataset = m_group->createDataSet(dh_form_entry, H5::PredType::NATIVE_ULLONG, filespace, ds_prop);
280 long long unsigned int ds_data[1] = {0};
281 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG);
282 } else {
283 H5::DataSet dataset = m_group->openDataSet(dh_form_entry);
284 const hsize_t offset[1] = {dataset.getInMemDataSize()/8};//FIXME
285 const hsize_t ds_size[1] = {offset[0] + 1};
286 dataset.extend(ds_size);
287 H5::DataSpace filespace = dataset.getSpace();
288 const hsize_t mem_size[1] = {1};
289 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
290 H5::DataSpace memspace(1, mem_size);
291 auto dh_form_entry_name = ds_name.substr(0, ds_name.find('(')) + "Form(DataHeaderForm)_entry";
292 H5::DataSet dh_form_dataset = m_group->openDataSet(dh_form_entry_name);
293 long long unsigned int ds_data[1] = {dh_form_dataset.getInMemDataSize()/8 - 1};//FIXME
294 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
295 }
296 }
297 if (entry_name == "DataHeaderForm") {
298 auto dh_entry_name = ds_name.substr(0, ds_name.find('(') - 4) + "(DataHeader)_form_entry";
299 H5::DataSet dataset = m_group->openDataSet(dh_entry_name);
300 const hsize_t offset[1] = {dataset.getInMemDataSize()/8 - 1};//FIXME
301 H5::DataSpace filespace = dataset.getSpace();
302 const hsize_t mem_size[1] = {1};
303 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
304 H5::DataSpace memspace(1, mem_size);
305 H5::DataSet dh_form_dataset = m_group->openDataSet(ds_name + "_entry");
306 long long unsigned int ds_data[1] = {dh_form_dataset.getInMemDataSize()/8 - 1};//FIXME
307 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
308 }
309 }
310 return(StatusCode::SUCCESS);
311}
312
313//___________________________________________________________________________
314StatusCode AthenaHDFStreamTool::getObject(void** target, std::size_t& nbytes, int/* num*/) {
315 if (m_token.empty()) {
316 return(StatusCode::SUCCESS);
317 }
318 ATH_MSG_INFO("AthenaHDFStreamTool::getObject: token = " << m_token);
319
320 std::string clid_name = m_token.substr(m_token.find("[CLID=") + 6);
321 stringBefore(clid_name,']');
322 std::string ds_name = m_token.substr(m_token.find("[CNT=") + 5);
323 stringBefore(ds_name, ']');
324 if (ds_name.empty()) {
325 return(StatusCode::SUCCESS);
326 }
327 while (ds_name.find('/') != std::string::npos) { ds_name = ds_name.replace(ds_name.find('/'), 1, "_"); }
328
329 std::string oid_name = m_token.substr(m_token.find("[OID="));
330 auto n = oid_name.find(']') + 1;
331 oid_name.resize(n);
332 std::size_t firstU, firstL;
333 long long unsigned int second;
334 ::sscanf(oid_name.c_str(), fmt_oid, &firstU, &firstL, &second);
335 if (m_read_size > m_read_position + 15) { // aux store data already read
336 std::size_t aux_size = 0;
337 ::sscanf(m_read_data + m_read_position, fmt_aux, &aux_size);
338 m_read_position += 15;
339 *target = m_read_data + m_read_position;
340 nbytes = aux_size;
341 m_read_position += nbytes;
342 char text[64];
343 sprintf(text, fmt_oid, m_read_position, firstL, second); // FIXME
344 text[40] = 0;
345 m_token.replace(m_token.find("[OID="), 39, text);
346 return(StatusCode::SUCCESS);
347 } else if (m_read_size > 0) {
348 return(StatusCode::FAILURE);
349 }
350
351 std::string entry_name = ds_name.substr(ds_name.find('(') + 1);
352 stringBefore(entry_name,')');
353// For DataHeader, get stored size
354 if (entry_name == "DataHeader") {
355 if (clid_name == "7BE56CEF-C866-4BEE-9348-A5F34B5F1DAD") { // DataHeaderForm Token is copied from DataHeader, change container name
356 ds_name.replace(ds_name.find("(DataHeader)"), 12, "Form(DataHeaderForm)");
357 second = m_event_iter; //FIXME, store real DHF id somewhere...
358 } else if (clid_name == "00000000-0000-0000-0000-000000000000") { // Return DataHeader Token, for createAddress
359 if (firstL > 0) { //FIXME1
360 m_token.clear();
361 } else {
362 m_token.replace(m_token.find("[CLID="), 43, "[CLID=4DDBD295-EFCE-472A-9EC8-15CD35A9EB8D]");
363 m_token.replace(m_token.find("[TECH="), 15, "[TECH=00000401]");
364 }
365 nbytes = m_token.size();
366 *target = const_cast<char*>(m_token.c_str());//FIXME
367 return(StatusCode::SUCCESS);
368 } else {
369 m_event_iter = second;
370 }
371
372 auto dh_entry = ds_name + "_entry";
373 H5::DataSet dataset = m_group->openDataSet(dh_entry);
374 if (second + 1 >= dataset.getInMemDataSize()/8) {
375 return(StatusCode::FAILURE);
376 }
377 const hsize_t offset[1] = {second};
378 H5::DataSpace filespace = dataset.getSpace();
379 const hsize_t mem_size[1] = {2};
380 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
381 H5::DataSpace memspace(1, mem_size);
382 long long unsigned int ds_data[2] = {0, 0};
383 dataset.read(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
384 firstL = ds_data[1] - ds_data[0]; //FIXME1
385 second = ds_data[0];
386 }
387
388 if (!m_group->exists(ds_name)) {
389 return(StatusCode::FAILURE);
390 }
391 H5::DataSet dataset = m_group->openDataSet(ds_name);
392 if (second + firstL > dataset.getInMemDataSize()) {
393 return(StatusCode::FAILURE);
394 }
395 const hsize_t offset[1] = {second};
396 H5::DataSpace filespace = dataset.getSpace();
397 const hsize_t mem_size[1] = {firstL};
398 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
399 H5::DataSpace memspace(1, mem_size);
400 m_read_data = new char[firstL];
401 m_read_size = firstL;
402 dataset.read(m_read_data, H5::PredType::NATIVE_CHAR, memspace, filespace);
403 *target = m_read_data;
404 if (firstU > 0) {
405 ::sscanf(m_read_data + firstL - 15, fmt_aux, &nbytes);
406 } else {
407 nbytes = firstL;
408 }
409 m_read_position = nbytes;
410 return(StatusCode::SUCCESS);
411}
412
413//___________________________________________________________________________
414StatusCode AthenaHDFStreamTool::clearObject(const char** tokenString, int&/* num*/) {
415 std::size_t firstU, firstL;
416 long long unsigned int second;
417 ::sscanf(m_token.substr(m_token.find("[OID="), 40).c_str(), fmt_oid, &firstU, &firstL, &second);
418 std::string ds_name = m_token.substr(m_token.find("[CNT=") + 5);
419 stringBefore(ds_name, ']');
420 while (ds_name.find('/') != std::string::npos) { ds_name = ds_name.replace(ds_name.find('/'), 1, "_"); }
421
422 if (firstU > 0 || ds_name.substr(ds_name.length() - 5, 4) == "Aux.") {
423 if (firstU == 0) firstU = firstL;
424 char text[64];
425 sprintf(text, fmt_aux, firstU);
426 text[15] = 0;
427 H5::DataSet dataset = m_group->openDataSet(ds_name);
428 const hsize_t offset[1] = {dataset.getInMemDataSize()};
429 const hsize_t ds_size[1] = {offset[0] + 15};
430 dataset.extend(ds_size);
431 H5::DataSpace filespace = dataset.getSpace();
432 const hsize_t mem_size[1] = {15};
433 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
434 H5::DataSpace memspace(1, mem_size);
435 dataset.write(text, H5::PredType::NATIVE_CHAR, memspace, filespace);
436 firstL += 15;
437 firstU = 1ul;
438 sprintf(text, fmt_oid, firstU, firstL, second); // FIXME
439 text[40] = 0;
440 m_token.replace(m_token.find("[OID="), 39, text);
441 }
442 // Return an empty token string for DataHeaderForm, to indicate HDF5 can't update DataHeader after it was written.
443 std::string entry_name = ds_name.substr(ds_name.find('(') + 1);
444 stringBefore(entry_name, ')');
445 if (entry_name == "DataHeaderForm") {
446 m_token.clear();
447 }
448 *tokenString = m_token.c_str();
449 return(StatusCode::SUCCESS);
450}
451
452//___________________________________________________________________________
453StatusCode AthenaHDFStreamTool::lockObject(const char* tokenString, int/* num*/) {
454 m_token = tokenString;
455 delete [] m_read_data; m_read_data = nullptr;
456 m_read_size = 0;
457 m_read_position = 0;
458 return(StatusCode::SUCCESS);
459}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
static const char *const fmt_oid
static const char *const fmt_aux
This file contains the class definition for the AthenaHDFStreamTool class.
double length(const pvec &v)
static Double_t sc
virtual ~AthenaHDFStreamTool()
Destructor.
StatusCode lockObject(const char *tokenString, int num=0)
long long unsigned int m_event_iter
StatusCode getObject(void **target, std::size_t &nbytes, int num=0)
AthenaHDFStreamTool(const std::string &type, const std::string &name, const IInterface *parent)
Standard Service Constructor.
StatusCode clearObject(const char **tokenString, int &num)
ServiceHandle< IIncidentSvc > m_incidentSvc
StatusCode makeClient(int num, std::string &streamPortSuffix)
StatusCode initialize()
Gaudi Service Interface method implementations:
StatusCode getLockedEvent(void **target, unsigned int &status) const
StatusCode putObject(const void *source, std::size_t nbytes, int num=0)
StatusCode makeServer(int num, const std::string &streamPortSuffix)
std::optional< InputFileIncidentGuard > m_inputFileGuard
StatusCode lockEvent(long eventNumber) const
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
static InputFileIncidentGuard begin(IIncidentSvc &incSvc, std::string_view source, std::string_view beginFileName, std::string_view guid, std::string_view endFileName={}, std::string_view beginType=IncidentType::BeginInputFile, std::string_view endType=IncidentType::EndInputFile)
Factory: fire the begin incident and return a guard whose destructor fires the matching end incident.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
const char * getEventDataName()
const char * getEventTagName()