ATLAS Offline Software
Loading...
Searching...
No Matches
AthenaHDFStreamTool.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3*/
4
5/* file contains the implementation for the AthenaHDFStreamTool class.
6 * @author Peter van Gemmeren <gemmeren@anl.gov>
7 **/
8
10
11#include "GaudiKernel/FileIncident.h"
12
13#include "StorageSvc/DbReflex.h"
15
16#include "H5Cpp.h"
17#include "H5File.h"
18#include "H5Group.h"
19
20static const char* const fmt_oid = "[OID=%08lX%08lX-%016llX]";
21static const char* const fmt_aux = "[AUX=%08lX]";
22
23namespace{
24 void
25 stringBefore(std::string & s, char sc){
26 auto n = s.find(sc);
27 if (n!=std::string::npos) s.resize(n);
28 return;
29 }
30}
31
32//___________________________________________________________________________
34 const std::string& name,
35 const IInterface* parent) : base_class(type, name, parent),
36 m_file(nullptr),
37 m_group(nullptr),
38 m_token(""),
39 m_read_data(nullptr),
40 m_read_size(0),
42 m_event_iter(0),
43 m_isClient(false),
44 m_incidentSvc("IncidentSvc", name) {
45}
46
47//___________________________________________________________________________
50
51//___________________________________________________________________________
53 if (!::AthAlgTool::initialize().isSuccess()) {
54 ATH_MSG_FATAL("Cannot initialize AthAlgTool base class.");
55 return(StatusCode::FAILURE);
56 }
57 // Retrieve IncidentSvc
58 if (!m_incidentSvc.retrieve().isSuccess()) {
59 ATH_MSG_FATAL("Cannot get IncidentSvc");
60 return(StatusCode::FAILURE);
61 }
62 return(StatusCode::SUCCESS);
63}
64
65//___________________________________________________________________________
67 ATH_MSG_INFO("in finalize()");
68 return(::AthAlgTool::finalize());
69}
70
71//___________________________________________________________________________
72StatusCode AthenaHDFStreamTool::makeServer(int/* num*/, const std::string& /*streamPortSuffix*/) {
73 ATH_MSG_ERROR("AthenaHDFStreamTool::makeServer");
74 return(StatusCode::FAILURE);
75}
76
77//___________________________________________________________________________
79 return(false);
80}
81
82//___________________________________________________________________________
83StatusCode AthenaHDFStreamTool::makeClient(int num, std::string& /*streamPortSuffix*/) {
84 ATH_MSG_INFO("AthenaHDFStreamTool::makeClient: " << num);
85
86 if (num > 0) {
87 m_file = new H5::H5File( "test.h5", H5F_ACC_TRUNC ); //FIXME, hardcoded filename
88 m_group = new H5::Group(m_file->createGroup("data"));
89 } else {
90 m_file = new H5::H5File( "test.h5", H5F_ACC_RDONLY );
91 m_group = new H5::Group(m_file->openGroup("data"));
92 }
93 m_isClient = true;
94 return(StatusCode::SUCCESS);
95}
96
97//___________________________________________________________________________
99 return(m_isClient);
100}
101
102//___________________________________________________________________________
103StatusCode AthenaHDFStreamTool::putEvent(long/* eventNumber*/, const void* /* source*/, std::size_t/* nbytes*/, unsigned int/* status*/) const {
104 ATH_MSG_ERROR("AthenaHDFStreamTool::putEvent");
105 return(StatusCode::FAILURE);
106}
107
108//___________________________________________________________________________
109StatusCode AthenaHDFStreamTool::getLockedEvent(void** target, unsigned int&/* status*/) const {
110 ATH_MSG_INFO("AthenaHDFStreamTool::getLockedEvent");
111 const std::string dh_entry = "POOLContainer(DataHeader)_entry";
112 H5::DataSet dataset = m_group->openDataSet(dh_entry);
113 if (m_event_iter + 1 >= dataset.getInMemDataSize()/8) { // End of File
114 FileIncident endFileIncident(name(), "EndInputFile", "HDF:test.h5"); //FIXME, hardcoded filename
115 m_incidentSvc->fireIncident(endFileIncident);
116 ATH_MSG_INFO("AthenaHDFStreamTool::getLockedEvent: no more events = " << m_event_iter);
117 return(StatusCode::RECOVERABLE);
118 }
119
120 const hsize_t offset[1] = {m_event_iter};
121 H5::DataSpace filespace = dataset.getSpace();
122 const hsize_t mem_size[1] = {2};
123 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
124 H5::DataSpace memspace(1, mem_size);
125 long long unsigned int ds_data[2] = {0, 0};
126 dataset.read(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
127 std::size_t nbytes = ds_data[1] - ds_data[0];
128 m_token = "[DB=00000000-0000-0000-0000-000000000000][CNT=POOLContainer(DataHeader)][CLID=4DDBD295-EFCE-472A-9EC8-15CD35A9EB8D][TECH=00000401]";
129 char text[64];
130 sprintf(text, fmt_oid, 0ul, nbytes, m_event_iter);
131 text[40] = 0;
132 m_token += text;
133
134 const int length = m_token.size() + 2; //FIXME: copy token
135 *target = new char[length];
136 std::memcpy(static_cast<char*>(*target), m_token.c_str(), length - 1);
137 target[length - 1] = 0;
138 return(StatusCode::SUCCESS);
139}
140
141//___________________________________________________________________________
142StatusCode AthenaHDFStreamTool::lockEvent(long eventNumber) const {
143 ATH_MSG_VERBOSE("AthenaHDFStreamTool::lockEvent: " << eventNumber);
144 m_event_iter = eventNumber;
145 if (eventNumber == 0) {
146 FileIncident beginFileIncident(name(), "BeginInputFile", "HDF:test.h5"); //FIXME, hardcoded filename
147 m_incidentSvc->fireIncident(beginFileIncident);
148 }
149 return(StatusCode::SUCCESS);
150}
151
152//___________________________________________________________________________
153StatusCode AthenaHDFStreamTool::putObject(const void* source, std::size_t nbytes, int/* num*/) {
154 if (nbytes == 0 || m_token.empty()) {
155 return(StatusCode::SUCCESS);
156 }
157 ATH_MSG_INFO("AthenaHDFStreamTool::putObject: source = " << source << ", nbytes = " << nbytes);
158
159 if (m_token.find("[CONT=") != std::string::npos) m_token.replace(m_token.find("[CONT="), 6, "[CNT=");
160 std::string ds_name = m_token.substr(m_token.find("[CNT=") + 5);
161 stringBefore(ds_name,']');
162 while (ds_name.find("/") != std::string::npos) { ds_name = ds_name.replace(ds_name.find("/"), 1, "_"); }
163
164 m_token.replace(m_token.find("[TECH="), 15, "[TECH=00000401]");
165 std::string className = m_token.substr(m_token.find("[PNAME=") + 7);
166 stringBefore(className, ']');
167
168 long long unsigned int positionCount = 0;
169 if (m_token.find("[CLID=") == std::string::npos) { // Core object
170 m_token += "[CLID=" + pool::DbReflex::guid(RootType::ByNameNoQuiet(className)).toString() + "]";
171 } else { // Aux Store extension
172 char text[64];
173 sprintf(text, fmt_aux, nbytes);
174 text[15] = 0;
175 H5::DataSet dataset = m_group->openDataSet(ds_name);
176 const hsize_t offset[1] = {dataset.getInMemDataSize()};
177 positionCount = offset[0];
178 const hsize_t ds_size[1] = {offset[0] + 15};
179 dataset.extend(ds_size);
180 H5::DataSpace filespace = dataset.getSpace();
181 const hsize_t mem_size[1] = {15};
182 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
183 H5::DataSpace memspace(1, mem_size);
184 dataset.write(text, H5::PredType::NATIVE_CHAR, memspace, filespace);
185 }
186
187// Write Payload data
188 if (!m_group->exists(ds_name)) { //if dataset doesn't exist, create it otherwise extend it
189 const hsize_t maxdim[1] = {H5S_UNLIMITED};
190 const hsize_t ds_size[1] = {nbytes};
191 H5::DataSpace filespace(1, ds_size, maxdim);
192 H5::DSetCreatPropList ds_prop;
193 hsize_t chunkdim[1] = {nbytes};
194 if( ds_name.starts_with(APRDefaults::TTreeNames::EventData) ||
195 ds_name.starts_with(APRDefaults::TTreeNames::EventTag) ) {
196 if (nbytes < 512) {
197 chunkdim[0] = 4096;
198 } else if (nbytes < 16 * 512) {
199 chunkdim[0] = 4 * 4096;
200 } else {
201 chunkdim[0] = (hsize_t(nbytes / 4096) + 1) * 4096;
202 }
203 }
204 ds_prop.setChunk(1, chunkdim);
205 char fill_val[1] = {0};
206 ds_prop.setFillValue(H5::PredType::NATIVE_CHAR, fill_val);
207 H5::DataSet dataset = m_group->createDataSet(ds_name, H5::PredType::NATIVE_CHAR, filespace, ds_prop);
208 dataset.write(source, H5::PredType::NATIVE_CHAR);
209 } else {
210 H5::DataSet dataset = m_group->openDataSet(ds_name);
211 const hsize_t offset[1] = {dataset.getInMemDataSize()};
212 positionCount = offset[0];
213 const hsize_t ds_size[1] = {offset[0] + nbytes};
214 dataset.extend(ds_size);
215 H5::DataSpace filespace = dataset.getSpace();
216 const hsize_t mem_size[1] = {nbytes};
217 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
218 H5::DataSpace memspace(1, mem_size);
219 dataset.write(source, H5::PredType::NATIVE_CHAR, memspace, filespace);
220 }
221 if (m_token.find("[OID=") == std::string::npos) { // Core object
222 char text[64];
223 sprintf(text, fmt_oid, 0ul, nbytes, positionCount);
224 text[40] = 0;
225 m_token += text;
226 } else {
227 char text[64];
228 std::size_t firstU, firstL;
229 long long unsigned int second;
230 ::sscanf(m_token.substr(m_token.find("[OID="), 40).c_str(), fmt_oid, &firstU, &firstL, &second);
231 if (firstU == 0ul) { //FIXME1
232 firstU = firstL; // Keep Core object size
233 }
234 firstL = positionCount + nbytes - second;
235 sprintf(text, fmt_oid, firstU, firstL, second); // FIXME
236 text[40] = 0;
237 m_token.replace(m_token.find("[OID="), 39, text);
238 }
239
240 std::string entry_name = ds_name.substr(ds_name.find('(') + 1);
241 stringBefore(entry_name,')');
242// For DataHeader, store entry point
243 if (entry_name == "DataHeader" || entry_name == "DataHeaderForm") {
244 auto dh_entry = ds_name + "_entry";
245 if (!m_group->exists(dh_entry)) {
246 const hsize_t maxdim[1] = {H5S_UNLIMITED};
247 const hsize_t ds_size[1] = {2};
248 H5::DataSpace filespace(1, ds_size, maxdim);
249 H5::DSetCreatPropList ds_prop;
250 const hsize_t chunkdim[1] = {512};
251 ds_prop.setChunk(1, chunkdim);
252 char fill_val[1] = {0};
253 ds_prop.setFillValue(H5::PredType::NATIVE_ULLONG, fill_val);
254 H5::DataSet dataset = m_group->createDataSet(dh_entry, H5::PredType::NATIVE_ULLONG, filespace, ds_prop);
255 long long unsigned int ds_data[2] = {positionCount, positionCount + nbytes};
256 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG);
257 } else {
258 H5::DataSet dataset = m_group->openDataSet(dh_entry);
259 const hsize_t offset[1] = {dataset.getInMemDataSize()/8};//FIXME
260 const hsize_t ds_size[1] = {offset[0] + 1};
261 dataset.extend(ds_size);
262 H5::DataSpace filespace = dataset.getSpace();
263 const hsize_t mem_size[1] = {1};
264 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
265 H5::DataSpace memspace(1, mem_size);
266 long long unsigned int ds_data[1] = {positionCount + nbytes};
267 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
268 }
269 if (entry_name == "DataHeader") {
270 auto dh_form_entry = ds_name + "_form_entry";
271 if (!m_group->exists(dh_form_entry)) {
272 const hsize_t maxdim[1] = {H5S_UNLIMITED};
273 const hsize_t ds_size[1] = {1};
274 H5::DataSpace filespace(1, ds_size, maxdim);
275 H5::DSetCreatPropList ds_prop;
276 const hsize_t chunkdim[1] = {512};
277 ds_prop.setChunk(1, chunkdim);
278 char fill_val[1] = {0};
279 ds_prop.setFillValue(H5::PredType::NATIVE_ULLONG, fill_val);
280 H5::DataSet dataset = m_group->createDataSet(dh_form_entry, H5::PredType::NATIVE_ULLONG, filespace, ds_prop);
281 long long unsigned int ds_data[1] = {0};
282 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG);
283 } else {
284 H5::DataSet dataset = m_group->openDataSet(dh_form_entry);
285 const hsize_t offset[1] = {dataset.getInMemDataSize()/8};//FIXME
286 const hsize_t ds_size[1] = {offset[0] + 1};
287 dataset.extend(ds_size);
288 H5::DataSpace filespace = dataset.getSpace();
289 const hsize_t mem_size[1] = {1};
290 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
291 H5::DataSpace memspace(1, mem_size);
292 auto dh_form_entry_name = ds_name.substr(0, ds_name.find('(')) + "Form(DataHeaderForm)_entry";
293 H5::DataSet dh_form_dataset = m_group->openDataSet(dh_form_entry_name);
294 long long unsigned int ds_data[1] = {dh_form_dataset.getInMemDataSize()/8 - 1};//FIXME
295 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
296 }
297 }
298 if (entry_name == "DataHeaderForm") {
299 auto dh_entry_name = ds_name.substr(0, ds_name.find('(') - 4) + "(DataHeader)_form_entry";
300 H5::DataSet dataset = m_group->openDataSet(dh_entry_name);
301 const hsize_t offset[1] = {dataset.getInMemDataSize()/8 - 1};//FIXME
302 H5::DataSpace filespace = dataset.getSpace();
303 const hsize_t mem_size[1] = {1};
304 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
305 H5::DataSpace memspace(1, mem_size);
306 H5::DataSet dh_form_dataset = m_group->openDataSet(ds_name + "_entry");
307 long long unsigned int ds_data[1] = {dh_form_dataset.getInMemDataSize()/8 - 1};//FIXME
308 dataset.write(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
309 }
310 }
311 return(StatusCode::SUCCESS);
312}
313
314//___________________________________________________________________________
315StatusCode AthenaHDFStreamTool::getObject(void** target, std::size_t& nbytes, int/* num*/) {
316 if (m_token.empty()) {
317 return(StatusCode::SUCCESS);
318 }
319 ATH_MSG_INFO("AthenaHDFStreamTool::getObject: token = " << m_token);
320
321 std::string clid_name = m_token.substr(m_token.find("[CLID=") + 6);
322 stringBefore(clid_name,']');
323 std::string ds_name = m_token.substr(m_token.find("[CNT=") + 5);
324 stringBefore(ds_name, ']');
325 if (ds_name.empty()) {
326 return(StatusCode::SUCCESS);
327 }
328 while (ds_name.find('/') != std::string::npos) { ds_name = ds_name.replace(ds_name.find('/'), 1, "_"); }
329
330 std::string oid_name = m_token.substr(m_token.find("[OID="));
331 auto n = oid_name.find(']') + 1;
332 oid_name.resize(n);
333 std::size_t firstU, firstL;
334 long long unsigned int second;
335 ::sscanf(oid_name.c_str(), fmt_oid, &firstU, &firstL, &second);
336 if (m_read_size > m_read_position + 15) { // aux store data already read
337 std::size_t aux_size = 0;
338 ::sscanf(m_read_data + m_read_position, fmt_aux, &aux_size);
339 m_read_position += 15;
340 *target = m_read_data + m_read_position;
341 nbytes = aux_size;
342 m_read_position += nbytes;
343 char text[64];
344 sprintf(text, fmt_oid, m_read_position, firstL, second); // FIXME
345 text[40] = 0;
346 m_token.replace(m_token.find("[OID="), 39, text);
347 return(StatusCode::SUCCESS);
348 } else if (m_read_size > 0) {
349 return(StatusCode::FAILURE);
350 }
351
352 std::string entry_name = ds_name.substr(ds_name.find('(') + 1);
353 stringBefore(entry_name,')');
354// For DataHeader, get stored size
355 if (entry_name == "DataHeader") {
356 if (clid_name == "7BE56CEF-C866-4BEE-9348-A5F34B5F1DAD") { // DataHeaderForm Token is copied from DataHeader, change container name
357 ds_name.replace(ds_name.find("(DataHeader)"), 12, "Form(DataHeaderForm)");
358 second = m_event_iter; //FIXME, store real DHF id somewhere...
359 } else if (clid_name == "00000000-0000-0000-0000-000000000000") { // Return DataHeader Token, for createAddress
360 if (firstL > 0) { //FIXME1
361 m_token.clear();
362 } else {
363 m_token.replace(m_token.find("[CLID="), 43, "[CLID=4DDBD295-EFCE-472A-9EC8-15CD35A9EB8D]");
364 m_token.replace(m_token.find("[TECH="), 15, "[TECH=00000401]");
365 }
366 nbytes = m_token.size();
367 *target = const_cast<char*>(m_token.c_str());//FIXME
368 return(StatusCode::SUCCESS);
369 } else {
370 m_event_iter = second;
371 }
372
373 auto dh_entry = ds_name + "_entry";
374 H5::DataSet dataset = m_group->openDataSet(dh_entry);
375 if (second + 1 >= dataset.getInMemDataSize()/8) {
376 return(StatusCode::FAILURE);
377 }
378 const hsize_t offset[1] = {second};
379 H5::DataSpace filespace = dataset.getSpace();
380 const hsize_t mem_size[1] = {2};
381 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
382 H5::DataSpace memspace(1, mem_size);
383 long long unsigned int ds_data[2] = {0, 0};
384 dataset.read(ds_data, H5::PredType::NATIVE_ULLONG, memspace, filespace);
385 firstL = ds_data[1] - ds_data[0]; //FIXME1
386 second = ds_data[0];
387 }
388
389 if (!m_group->exists(ds_name)) {
390 return(StatusCode::FAILURE);
391 }
392 H5::DataSet dataset = m_group->openDataSet(ds_name);
393 if (second + firstL > dataset.getInMemDataSize()) {
394 return(StatusCode::FAILURE);
395 }
396 const hsize_t offset[1] = {second};
397 H5::DataSpace filespace = dataset.getSpace();
398 const hsize_t mem_size[1] = {firstL};
399 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
400 H5::DataSpace memspace(1, mem_size);
401 m_read_data = new char[firstL];
402 m_read_size = firstL;
403 dataset.read(m_read_data, H5::PredType::NATIVE_CHAR, memspace, filespace);
404 *target = m_read_data;
405 if (firstU > 0) {
406 ::sscanf(m_read_data + firstL - 15, fmt_aux, &nbytes);
407 } else {
408 nbytes = firstL;
409 }
410 m_read_position = nbytes;
411 return(StatusCode::SUCCESS);
412}
413
414//___________________________________________________________________________
415StatusCode AthenaHDFStreamTool::clearObject(const char** tokenString, int&/* num*/) {
416 std::size_t firstU, firstL;
417 long long unsigned int second;
418 ::sscanf(m_token.substr(m_token.find("[OID="), 40).c_str(), fmt_oid, &firstU, &firstL, &second);
419 std::string ds_name = m_token.substr(m_token.find("[CNT=") + 5);
420 stringBefore(ds_name, ']');
421 while (ds_name.find('/') != std::string::npos) { ds_name = ds_name.replace(ds_name.find('/'), 1, "_"); }
422
423 if (firstU > 0 || ds_name.substr(ds_name.length() - 5, 4) == "Aux.") {
424 if (firstU == 0) firstU = firstL;
425 char text[64];
426 sprintf(text, fmt_aux, firstU);
427 text[15] = 0;
428 H5::DataSet dataset = m_group->openDataSet(ds_name);
429 const hsize_t offset[1] = {dataset.getInMemDataSize()};
430 const hsize_t ds_size[1] = {offset[0] + 15};
431 dataset.extend(ds_size);
432 H5::DataSpace filespace = dataset.getSpace();
433 const hsize_t mem_size[1] = {15};
434 filespace.selectHyperslab(H5S_SELECT_SET, mem_size, offset);
435 H5::DataSpace memspace(1, mem_size);
436 dataset.write(text, H5::PredType::NATIVE_CHAR, memspace, filespace);
437 firstL += 15;
438 firstU = 1ul;
439 sprintf(text, fmt_oid, firstU, firstL, second); // FIXME
440 text[40] = 0;
441 m_token.replace(m_token.find("[OID="), 39, text);
442 }
443 // Return an empty token string for DataHeaderForm, to indicate HDF5 can't update DataHeader after it was written.
444 std::string entry_name = ds_name.substr(ds_name.find('(') + 1);
445 stringBefore(entry_name, ')');
446 if (entry_name == "DataHeaderForm") {
447 m_token.clear();
448 }
449 *tokenString = m_token.c_str();
450 return(StatusCode::SUCCESS);
451}
452
453//___________________________________________________________________________
454StatusCode AthenaHDFStreamTool::lockObject(const char* tokenString, int/* num*/) {
455 m_token = tokenString;
456 delete [] m_read_data; m_read_data = nullptr;
457 m_read_size = 0;
458 m_read_position = 0;
459 return(StatusCode::SUCCESS);
460}
#define ATH_MSG_ERROR(x)
#define ATH_MSG_FATAL(x)
#define ATH_MSG_INFO(x)
#define ATH_MSG_VERBOSE(x)
static const char *const fmt_oid
static const char *const fmt_aux
This file contains the class definition for the AthenaHDFStreamTool class.
double length(const pvec &v)
static Double_t sc
virtual ~AthenaHDFStreamTool()
Destructor.
StatusCode lockObject(const char *tokenString, int num=0)
long long unsigned int m_event_iter
StatusCode getObject(void **target, std::size_t &nbytes, int num=0)
AthenaHDFStreamTool(const std::string &type, const std::string &name, const IInterface *parent)
Standard Service Constructor.
StatusCode clearObject(const char **tokenString, int &num)
ServiceHandle< IIncidentSvc > m_incidentSvc
StatusCode makeClient(int num, std::string &streamPortSuffix)
StatusCode initialize()
Gaudi Service Interface method implementations:
StatusCode getLockedEvent(void **target, unsigned int &status) const
StatusCode putObject(const void *source, std::size_t nbytes, int num=0)
StatusCode makeServer(int num, const std::string &streamPortSuffix)
StatusCode lockEvent(long eventNumber) const
constexpr void toString(std::span< char, StrLen > buf, bool uppercase=true) const noexcept
Automatic conversion to string representation.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Definition RootType.cxx:586
static Guid guid(const TypeH &id)
Determine Guid (normalized string form) from reflection type.
static constexpr const char * EventTag
Definition APRDefaults.h:13
static constexpr const char * EventData
Definition APRDefaults.h:12