11#include <boost/shared_ptr.hpp>
14#include "EventStorage/DataWriterCallBack.h"
15#include "EventStorage/FileNameCallback.h"
16#include "EventStorage/RawFileName.h"
17#include "EventStorage/EventStorageIssues.h"
18#include "EventStorage/SimpleFileName.h"
28using EventStorage::DataWriterCallBack;
29using EventStorage::DWError;
30using EventStorage::FileNameCallback;
31using EventStorage::SimpleFileName;
35 charAddress(
auto & val){
36 return reinterpret_cast<char *
>(&
val);
39 ccharAddress(
const auto & val){
40 return reinterpret_cast<const char *
>(&
val);
44 bytefAddress(
auto & val){
45 return reinterpret_cast<Bytef *
>(&
val);
48 cbytefAddress(
auto & val){
49 return reinterpret_cast<const Bytef *
>(&
val);
57 boost::shared_ptr<FileNameCallback> theFNCB,
60 const std::string& streamType,
61 const std::string& streamName,
62 const std::string& stream,
63 const unsigned int lumiBlockNumber,
64 const std::string& applicationName,
65 const std::vector<std::string>& fmdStrings,
67 const unsigned int compLevel)
69 initDW(writingPath, theFNCB, rPar,
project, streamType, streamName, stream, lumiBlockNumber, applicationName, fmdStrings, compression, compLevel);
75 const string& fileNameCore,
78 const unsigned int startIndex,
80 const unsigned int compLevel)
82 boost::shared_ptr<daq::RawFileName>
83 fileName(
new daq::RawFileName(fileNameCore));
85 if ( fileName->hasValidCore() ) {
93 std::stringstream mystream;
94 mystream <<
"FileName " << fileName->fileName()
95 <<
" has failed interpretation."
96 <<
" The file header will have no values for "
97 <<
" project, stream, lumiBlock and appName";
98 ERS_DEBUG(1,mystream.str());
109 fileName->setFileSequenceNumber(startIndex);
113 fmdStrings, compression,compLevel);
120 const string& fileNameCore,
123 const std::string& streamType,
124 const std::string& streamName,
125 const std::string& stream,
126 const unsigned int lumiBlockNumber,
127 const std::string& applicationName,
128 const std::vector<std::string>& fmdStrings,
130 const unsigned int compLevel)
132 boost::shared_ptr<FileNameCallback> fileName(
new SimpleFileName(fileNameCore));
133 initDW(writingPath, fileName, rPar,
project, streamType, streamName, stream, lumiBlockNumber, applicationName, fmdStrings, compression, compLevel);
140 ERS_DEBUG(1,
"DataWriter::~DataWriter() called.");
147 ERS_DEBUG(1,
"DataWriter::~DataWriter() finished.");
152initDW(
const string& writingPath,
153 boost::shared_ptr<FileNameCallback> theFNCB,
156 const std::string& streamType,
157 const std::string& streamName,
158 const std::string& stream,
159 const unsigned int lumiBlockNumber,
160 const std::string& applicationName,
161 const std::vector<std::string>& fmdStrings,
163 const unsigned int compLevel)
210 std::ostringstream o;
215 m_fmdStrings.push_back(offline_EventStorage_v5::compressiontag +
216 "=" + offline_EventStorage_v5::type_to_string(
m_compression));
219 m_check = ::adler32(0L, Z_NULL, 0);
239 localtime_r( &a_time, &t);
241 getDate= 1000000*t.tm_mday+
245 getTime= 10000*t.tm_hour+
272 return EventStorage::DWOK;
278 return EventStorage::DWOK;
303 iov.iov_base = event;
304 iov.iov_len = dataSize;
305 return this->
putData(1, &iov, todisk);
312 return this->
putData(entries, my_iovec, todisk);
318 iov.iov_base = event;
319 iov.iov_len = dataSize;
320 return this->
putData(1, &iov, sizeToDisk);
332 iov.iov_base = event;
333 iov.iov_len = dataSize;
348 ERS_DEBUG(3,
"DataWriter::putData called for an iovec.");
353 if(!
m_cFile.good())
return EventStorage::DWNOOK;
356 unsigned int dataSize = 0;
357 for(
unsigned int i = 0; i<
entries; i++) {
358 dataSize += my_iovec[i].
iov_len;
363 sizeToDisk = dataSize;
368 double eventMB=
static_cast<double>(dataSize)/(1024*1024);
370 if(
static_cast<unsigned int>(dataMB+0.5) >
372 ERS_DEBUG(3,
"Approaching file size limit in MB.");
379 ERS_DEBUG(3,
"Approaching file size limit in number of events.");
387 offline_EventStorage_v5::zlibcompress(*
m_compressed, sizeToDisk,
389 }
catch(offline_EventStorage_v5::CompressionIssue& ex){
390 EventStorage::WritingIssue ci(ERS_HERE,
"Data compression failed.", ex);
392 return EventStorage::DWNOOK;
409 for (
unsigned int i =0; i<
entries; i++){
410 m_cFile.write(
static_cast<const char*
>(my_iovec[i].iov_base),
411 my_iovec[i].iov_len);
414 static_cast<const Bytef *
>(my_iovec[i].iov_base),
415 my_iovec[i].iov_len);
425 m_cFileMB +=
static_cast<double>(sizeToDisk)/(1024*1024);
426 m_runMB +=
static_cast<double>(sizeToDisk)/(1024*1024);
432 if(!
m_cFile.good())
return EventStorage::DWNOOK;
434 return EventStorage::DWOK;
456 return static_cast<unsigned int>(
m_cFileMB+0.5);
461 return static_cast<unsigned int>(
m_runMB+0.5);
467 ERS_DEBUG(3,
"DataWriter::closeFile() called.");
479 std::ostringstream oss;
482 oss << std::hex << std::uppercase <<
m_check;
483 std::string checksum = oss.str();
484 m_check = ::adler32(0L, Z_NULL, 0);
491 EventStorage::WritingIssue ci(ERS_HERE,
"Rename failed.");
493 return EventStorage::DWNOOK;
501 ERS_DEBUG(3,
"Execute callback from DataWriter.");
522 ERS_DEBUG(3,
"New writing path became effective. The path is "
529 }
catch(EventStorage::ES_SingleFile &e){
533 return EventStorage::DWOK;
535 return EventStorage::DWNOOK;
541 ERS_DEBUG(2,
"DataWriter::nextFile() called.");
558 ifstream test(name.c_str(),ios::binary | ios::in);
559 bool isThere = test.good();
572 }
catch(EventStorage::ES_SingleFile &e){
585 EventStorage::WritingIssue ci(ERS_HERE, err.c_str());
591 EventStorage::WritingIssue ci(ERS_HERE, err.c_str());
599 oss <<
"increase file number to ";
601 EventStorage::WritingIssue ci(ERS_HERE, oss.str().c_str());
610 ios::out | ios::app | ios::binary);
612 m_check = ::adler32(0L, Z_NULL, 0);
619 ERS_DEBUG(3,
"Writing "<<
m_fmdStrings.size()<<
" metadata strings.");
622 ERS_DEBUG(3,
"Status of the file. good() returns "<<
m_cFile.good());
627 ERS_DEBUG(3,
"Execute callback from DataWriter.");
647 string name = n.str();
654 <<
" (effective for hext file).");
668 uint32_t *record =
reinterpret_cast<uint32_t *
>(ri);
669 const uint32_t *pattern =
reinterpret_cast<const uint32_t *
>(
pi);
672 for(
int i=0; i<size; i++) {
673 if(pattern[i] != 0) {
674 m_cFile.write(
reinterpret_cast<const char*
>(pattern+i),
sizeof(uint32_t));
675 m_check = ::adler32(
m_check,
reinterpret_cast<const Bytef*
>(pattern+i),
sizeof(uint32_t));
677 m_cFile.write(
reinterpret_cast<char*
>(record+i),
sizeof(uint32_t));
678 m_check = ::adler32(
m_check,
reinterpret_cast<const Bytef*
>(record+i),
sizeof(uint32_t));
686 const char *cName = nst.
appName.c_str();
689 uint32_t sizeName = nst.
appName.size();
697 m_cFile.write(charAddress(sizeName),
sizeof(uint32_t));
698 m_check = ::adler32(
m_check,cbytefAddress(sizeName),
sizeof(uint32_t));
700 m_check = ::adler32(
m_check,
reinterpret_cast<const Bytef*
>(cName),sizeName);
702 char ns = sizeName % 4;
705 m_check = ::adler32(
m_check,
reinterpret_cast<const Bytef*
>(
" "),4-ns);
708 m_cFile.write(charAddress(sizeTag),
sizeof(uint32_t));
709 m_check = ::adler32(
m_check,bytefAddress(sizeTag),
sizeof(uint32_t));
711 m_check = ::adler32(
m_check,
reinterpret_cast<const Bytef*
>(cTag),sizeTag);
716 m_check = ::adler32(
m_check,
reinterpret_cast<const Bytef*
>(
" "),4-ns);
723 ERS_DEBUG(2,
"Writing the metadata strings.");
730 uint32_t nstrings = fmdStrings.size();
731 m_cFile.write(charAddress(nstrings),
sizeof(uint32_t));
735 vector<string>::const_iterator it;
736 for(it=fmdStrings.begin(); it!=fmdStrings.end(); ++it) {
738 const char *cst = it->c_str();
739 uint32_t slen = it->size();
741 m_cFile.write(charAddress(slen),
sizeof(uint32_t));
744 m_check = ::adler32(
m_check,
reinterpret_cast<const Bytef*
>(cst),slen);
748 m_check = ::adler32(
m_check,
reinterpret_cast<const Bytef*
>(
" "),4-ns);
797 if (::stat64(fileNameCore.c_str(), &tmp) == -1) {
798 EventStorage::WritingIssue ci(ERS_HERE,
"stat64 failed.");
T_ResultType project(ParameterMapping::type< N > parameter_map, const T_Matrix &matrix)
This class provides a encapsulation of a GUID/UUID/CLSID/IID data structure (128 bit number).
freeMetaDataStrings m_fmdStrings
void date_timeAsInt(uint32_t &getDate, uint32_t &getTime) const
void initDW(const std::string &writingPath, boost::shared_ptr< EventStorage::FileNameCallback > theFNCB, const run_parameters_record &rPar, const std::string &project, const std::string &streamType, const std::string &streamName, const std::string &stream, const unsigned int lumiBlockNumber, const std::string &applicationName, const std::vector< std::string > &fmdStrings, const CompressionType compression, const unsigned int compLevel)
EventStorage::DWError putPrecompressedData(const unsigned int &dataSize, const void *event)
Write a single block of data already compressed.
EventStorage::DWError setMaxFileMB(const unsigned int &maxFileMB)
max size of 1 file in MB.
EventStorage::DWError putData_implementation(const unsigned int &entries, const iovec_const *my_iovec, uint32_t &sizeToDisk, bool precompressed=false)
void registerCallBack(EventStorage::DataWriterCallBack *pUserCBclass)
If you want to define actions to be taken when a file is open or closed register your call-back class...
internal_run_parameters_record m_internal_run_parameters_record
void file_record(void *ri, const void *pi)
std::string nameFile(const FileStatus &fs) const
get the name of the current file
file_name_strings m_file_name_strings
EventStorage::DataWriterCallBack * m_callBack
EventStorage::DWError closeFile()
Close the currently open file.
offline_EventStorage_v5::iovec_const iovec_const
unsigned int m_lumiBlockNumber
file_end_record m_file_end_record
~DataWriter()
destructor, the way to close the sequence of files
unsigned int dataMB_InFileSequence() const
number of mega bytes written to the file sequence
bool fileExists(const std::string &name) const
file_start_record m_file_start_record
void cd(const std::string &writingPath)
set another writing path
bool good() const
feedback to user
EventStorage::DWError setMaxFileNE(const unsigned int &maxFileNE)
max size of 1 file in number of data blocks (or events) 1 putData call is one block
uint64_t getFileSize(const std::string &fileNameCore) const
DataBuffer * m_compressed
std::string m_nextWritePath
unsigned int dataMB_InFile() const
number of mega bytes written to the current file
unsigned int eventsInFileSequence() const
number of events written to the file sequence
int64_t getPosition() const
get the offset of the latest written event; one should call it AFTER putData(...)
void setGuid(const std::string &Guid)
Externally set the GUID for the next file in this sequence.
boost::shared_ptr< EventStorage::FileNameCallback > m_filenamecallback
void setRunParamsRecord(const run_parameters_record &rPar)
CompressionType getCompression() const
Returns the compression type for this file.
bool inTransition() const
The file currently open is not closed immediately after cd() this method will return true if we are i...
EventStorage::DWError putData(const unsigned int &dataSize, const void *event)
Write a single block of data.
CompressionType m_compression
std::string m_applicationName
EventStorage::DWError nextFile()
close the file and open next one immediately
unsigned int eventsInFile() const
number of events written to the currently open file
const file_end_record file_end_pattern
const data_separator_record data_separator_pattern
const uint32_t file_name_strings_marker
void reset_record(void *ri, const void *pi)
std::vector< std::string > freeMetaDataStrings
const internal_run_parameters_record run_parameters_pattern
const uint32_t free_strings_marker
const file_start_record file_start_pattern
uint32_t data_block_number