11 #include <boost/shared_ptr.hpp>
14 #include "EventStorage/DataWriterCallBack.h"
15 #include "EventStorage/FileNameCallback.h"
16 #include "EventStorage/RawFileName.h"
17 #include "EventStorage/EventStorageIssues.h"
18 #include "EventStorage/SimpleFileName.h"
21 #include <sys/types.h>
28 using EventStorage::DataWriterCallBack;
29 using EventStorage::DWError;
30 using EventStorage::FileNameCallback;
31 using EventStorage::SimpleFileName;
35 charAddress(
auto &
val){
36 return reinterpret_cast<char *
>(&
val);
39 ccharAddress(
const auto &
val){
40 return reinterpret_cast<const char *
>(&
val);
44 bytefAddress(
auto &
val){
45 return reinterpret_cast<Bytef *
>(&
val);
48 cbytefAddress(
auto &
val){
49 return reinterpret_cast<const Bytef *
>(&
val);
56 DataWriter(
const string& writingPath,
57 boost::shared_ptr<FileNameCallback> theFNCB,
63 const unsigned int lumiBlockNumber,
64 const std::string& applicationName,
65 const std::vector<std::string>& fmdStrings,
67 const unsigned int compLevel)
69 initDW(writingPath, theFNCB, rPar,
project,
streamType,
streamName,
stream, lumiBlockNumber, applicationName, fmdStrings,
compression, compLevel);
74 DataWriter(
const string& writingPath,
75 const string& fileNameCore,
78 const unsigned int startIndex,
80 const unsigned int compLevel)
82 boost::shared_ptr<daq::RawFileName>
83 fileName(
new daq::RawFileName(fileNameCore));
87 m_streamType =
fileName->streamType();
88 m_streamName =
fileName->streamName();
90 m_lumiBlockNumber =
fileName->lumiBlockNumber();
91 m_applicationName =
fileName->applicationName();
93 std::stringstream mystream;
94 mystream <<
"FileName " <<
fileName->fileName()
95 <<
" has failed interpretation."
96 <<
" The file header will have no values for "
97 <<
" project, stream, lumiBlock and appName";
98 ERS_DEBUG(1,mystream.str());
105 m_lumiBlockNumber = 0;
106 m_applicationName =
"";
109 fileName->setFileSequenceNumber(startIndex);
111 initDW(writingPath,
fileName, rPar, m_project, m_streamType, m_streamName,
112 m_stream, m_lumiBlockNumber, m_applicationName,
119 DataWriter(
const string& writingPath,
120 const string& fileNameCore,
125 const std::string&
stream,
126 const unsigned int lumiBlockNumber,
127 const std::string& applicationName,
128 const std::vector<std::string>& fmdStrings,
130 const unsigned int compLevel)
132 boost::shared_ptr<FileNameCallback>
fileName(
new SimpleFileName(fileNameCore));
133 initDW(writingPath,
fileName, rPar,
project,
streamType,
streamName,
stream, lumiBlockNumber, applicationName, fmdStrings,
compression, compLevel);
138 DataWriter::~DataWriter()
140 ERS_DEBUG(1,
"DataWriter::~DataWriter() called.");
142 m_file_end_record.status = 1;
143 if(m_cFileOpen) closeFile();
147 ERS_DEBUG(1,
"DataWriter::~DataWriter() finished.");
152 initDW(
const string& writingPath,
153 boost::shared_ptr<FileNameCallback> theFNCB,
158 const std::string&
stream,
159 const unsigned int lumiBlockNumber,
160 const std::string& applicationName,
161 const std::vector<std::string>& fmdStrings,
163 const unsigned int compLevel)
169 m_lumiBlockNumber = lumiBlockNumber;
170 m_applicationName = applicationName;
172 m_filenamecallback = theFNCB;
179 m_latestPosition = -1;
181 m_writePath = writingPath;
182 m_nextWritePath =
"";
185 m_file_name_strings.appName = m_applicationName;
187 m_file_name_strings.fileNameCore = m_filenamecallback->getCoreName();
189 setRunParamsRecord(rPar);
191 m_file_start_record.sizeLimit_MB = 0;
192 m_file_start_record.sizeLimit_dataBlocks = 0;
193 date_timeAsInt(m_file_start_record.date,m_file_start_record.time);
196 m_openFailed =
false;
200 m_fmdStrings = fmdStrings;
203 m_complevel = compLevel;
208 m_fmdStrings.push_back(
"Stream=" + m_stream );
209 m_fmdStrings.push_back(
"Project=" + m_project );
210 std::ostringstream o;
211 o << m_lumiBlockNumber;
212 m_fmdStrings.push_back(
"LumiBlock=" + o.str());
215 m_fmdStrings.push_back(offline_EventStorage_v5::compressiontag +
216 "=" + offline_EventStorage_v5::type_to_string(m_compression));
219 m_check = ::adler32(0
L, Z_NULL, 0);
223 this->spaceForGuid();
224 this->openNextFile();
239 localtime_r( &a_time, &
t);
241 getDate= 1000000*
t.tm_mday+
254 if(m_openFailed)
return false;
258 return m_cFile.good();
269 DWError DataWriter::setMaxFileMB(
const unsigned int& maxFileMB)
271 m_file_start_record.sizeLimit_MB = maxFileMB;
272 return EventStorage::DWOK;
275 DWError DataWriter::setMaxFileNE(
const unsigned int& maxFileNE)
277 m_file_start_record.sizeLimit_dataBlocks = maxFileNE;
278 return EventStorage::DWOK;
285 m_internal_run_parameters_record.marker = rPar.
marker;
286 m_internal_run_parameters_record.record_size = rPar.
record_size;
287 m_internal_run_parameters_record.run_number = rPar.
run_number;
288 m_internal_run_parameters_record.max_events = rPar.
max_events;
289 m_internal_run_parameters_record.rec_enable = rPar.
rec_enable;
290 m_internal_run_parameters_record.trigger_type = rPar.
trigger_type;
291 m_internal_run_parameters_record.detector_mask_1of2 = rPar.
detector_mask & 0xFFFFFFFF;
292 m_internal_run_parameters_record.detector_mask_2of2 = (rPar.
detector_mask>> 32);
293 m_internal_run_parameters_record.beam_type = rPar.
beam_type;
294 m_internal_run_parameters_record.beam_energy = rPar.
beam_energy;
298 DWError DataWriter::putData(
const unsigned int&
dataSize,
const void *
event)
305 return this->putData(1, &
iov, todisk);
309 DWError DataWriter::putData(
const unsigned int&
entries,
const struct iovec * my_iovec)
312 return this->putData(
entries, my_iovec, todisk);
320 return this->putData(1, &
iov, sizeToDisk);
324 return this->putData_implementation(
entries, my_iovec, sizeToDisk);
328 DWError DataWriter::putPrecompressedData(
const unsigned int&
dataSize,
334 return this->putPrecompressedData(1, &
iov);
338 DWError DataWriter::putPrecompressedData(
const unsigned int&
entries,
342 return this->putData_implementation(
entries, my_iovec, todisk,
true);
346 DWError DataWriter::putData_implementation(
const unsigned int&
entries,
const iovec_const * my_iovec,
uint32_t& sizeToDisk,
bool precompressed){
348 ERS_DEBUG(3,
"DataWriter::putData called for an iovec.");
349 if(!m_cFileOpen) openNextFile();
351 if(!m_cFileOpen)
return EventStorage::DWNOOK;
353 if(!m_cFile.good())
return EventStorage::DWNOOK;
367 if(m_file_start_record.sizeLimit_MB>0) {
368 double eventMB=
static_cast<double>(
dataSize)/(1024*1024);
369 double dataMB=m_cFileMB+eventMB;
370 if(
static_cast<unsigned int>(dataMB+0.5) >
371 m_file_start_record.sizeLimit_MB) {
372 ERS_DEBUG(3,
"Approaching file size limit in MB.");
377 if((m_file_start_record.sizeLimit_dataBlocks>0) &&
378 (m_file_end_record.events_in_file >= m_file_start_record.sizeLimit_dataBlocks)){
379 ERS_DEBUG(3,
"Approaching file size limit in number of events.");
385 if(m_compression ==
ZLIB && !precompressed){
387 offline_EventStorage_v5::zlibcompress(*m_compressed, sizeToDisk,
389 }
catch(offline_EventStorage_v5::CompressionIssue& ex){
390 EventStorage::WritingIssue ci(ERS_HERE,
"Data compression failed.", ex);
392 return EventStorage::DWNOOK;
402 m_latestPosition = m_cFile.tellg();
408 if(m_compression !=
ZLIB || precompressed){
410 m_cFile.write(
static_cast<const char*
>(my_iovec[
i].iov_base),
411 my_iovec[
i].iov_len);
413 m_check = ::adler32(m_check,
414 static_cast<const Bytef *
>(my_iovec[
i].iov_base),
415 my_iovec[
i].iov_len);
418 m_cFile.write(
static_cast<const char *
>(m_compressed->handle()), sizeToDisk);
419 m_check = ::adler32(m_check,
420 static_cast<const Bytef *
>(m_compressed->handle()),
425 m_cFileMB +=
static_cast<double>(sizeToDisk)/(1024*1024);
426 m_runMB +=
static_cast<double>(sizeToDisk)/(1024*1024);
427 m_file_end_record.events_in_file++;
428 m_file_end_record.events_in_run++;
430 ERS_DEBUG(3,
"Event "<< m_file_end_record.events_in_run<<
" is written at offset "<<m_latestPosition);
432 if(!m_cFile.good())
return EventStorage::DWNOOK;
434 return EventStorage::DWOK;
440 return m_latestPosition;
444 unsigned int DataWriter::eventsInFile()
const
446 return m_file_end_record.events_in_file;
449 unsigned int DataWriter::eventsInFileSequence()
const
451 return m_file_end_record.events_in_run;
454 unsigned int DataWriter::dataMB_InFile()
const
456 return static_cast<unsigned int>(m_cFileMB+0.5);
459 unsigned int DataWriter::dataMB_InFileSequence()
const
461 return static_cast<unsigned int>(m_runMB+0.5);
465 DWError DataWriter::closeFile()
467 ERS_DEBUG(3,
"DataWriter::closeFile() called.");
470 m_file_end_record.data_in_file =
static_cast<unsigned int>(m_cFileMB+0.5);
471 m_file_end_record.data_in_run =
static_cast<unsigned int>(m_runMB+0.5);
473 date_timeAsInt(m_file_end_record.date,m_file_end_record.time);
479 std::ostringstream oss;
482 oss << std::hex << std::uppercase <<m_check;
483 std::string checksum = oss.str();
484 m_check = ::adler32(0
L, Z_NULL, 0);
491 EventStorage::WritingIssue ci(ERS_HERE,
"Rename failed.");
493 return EventStorage::DWNOOK;
500 if(m_callBack != NULL) {
501 ERS_DEBUG(3,
"Execute callback from DataWriter.");
502 m_callBack->FileWasClosed(
503 m_filenamecallback->getCurrentFileName(),
510 m_file_end_record.events_in_file,
511 m_internal_run_parameters_record.run_number,
512 m_file_start_record.file_number,
519 if(m_nextWritePath !=
"") {
520 m_writePath=m_nextWritePath;
522 ERS_DEBUG(3,
"New writing path became effective. The path is "
528 m_filenamecallback->advance();
529 }
catch(EventStorage::ES_SingleFile &
e){
533 return EventStorage::DWOK;
535 return EventStorage::DWNOOK;
539 DWError DataWriter::nextFile()
541 ERS_DEBUG(2,
"DataWriter::nextFile() called.");
543 DWError we=closeFile();
552 if(m_openFailed) we=EventStorage::DWNOOK;
558 ifstream
test(
name.c_str(),ios::binary | ios::in);
559 bool isThere =
test.good();
565 void DataWriter::openNextFile()
568 m_file_end_record.events_in_file=0;
571 m_file_start_record.file_number = m_filenamecallback->getIndex();
572 }
catch(EventStorage::ES_SingleFile &
e){
573 m_file_start_record.file_number++;
575 date_timeAsInt(m_file_start_record.date,m_file_start_record.time);
585 EventStorage::WritingIssue ci(ERS_HERE,
err.c_str());
591 EventStorage::WritingIssue ci(ERS_HERE,
err.c_str());
594 m_filenamecallback->fileAlreadyExists();
596 m_file_start_record.file_number = m_filenamecallback->getIndex();
599 oss <<
"increase file number to ";
600 oss << m_file_start_record.file_number;
601 EventStorage::WritingIssue ci(ERS_HERE, oss.str().c_str());
604 ERS_DEBUG(2,
"OK to write file with number "<<m_file_start_record.file_number);
612 m_check = ::adler32(0
L, Z_NULL, 0);
615 file_record(m_file_name_strings);
619 ERS_DEBUG(3,
"Writing "<<m_fmdStrings.size()<<
" metadata strings.");
620 if(m_fmdStrings.size()>0) file_record(m_fmdStrings);
622 ERS_DEBUG(3,
"Status of the file. good() returns "<<m_cFile.good());
623 m_cFileOpen = m_cFile.good();
624 m_openFailed = !m_cFile.good();
626 if((m_callBack != NULL) && m_cFileOpen) {
627 ERS_DEBUG(3,
"Execute callback from DataWriter.");
628 m_callBack->FileWasOpened(
629 m_filenamecallback->getCurrentFileName(),
635 m_internal_run_parameters_record.run_number,
636 m_file_start_record.file_number,
645 n << m_writePath <<
"/";
646 n << m_filenamecallback->getCurrentFileName((
FINISHED ==
fs)?
false:
true);
647 string name =
n.str();
652 m_nextWritePath = writingPath;
653 ERS_DEBUG(3,
"Directory changed to "<<m_nextWritePath
654 <<
" (effective for hext file).");
657 bool DataWriter::inTransition()
const {
660 if((m_nextWritePath !=
"") &&
661 (m_nextWritePath != m_writePath)) inT=
true;
666 void DataWriter::file_record(
void *ri,
const void *
pi) {
675 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>(
pattern+
i),
sizeof(
uint32_t));
677 m_cFile.write(
reinterpret_cast<char*
>(record+
i),
sizeof(
uint32_t));
678 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>(record+
i),
sizeof(
uint32_t));
686 const char *cName = nst.
appName.c_str();
693 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>
697 m_cFile.write(charAddress(sizeName),
sizeof(
uint32_t));
698 m_check = ::adler32(m_check,cbytefAddress(sizeName),
sizeof(
uint32_t));
699 m_cFile.write(cName,sizeName);
700 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>(cName),sizeName);
702 char ns = sizeName % 4;
704 m_cFile.write(
" ",4-
ns);
705 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>(
" "),4-
ns);
708 m_cFile.write(charAddress(sizeTag),
sizeof(
uint32_t));
709 m_check = ::adler32(m_check,bytefAddress(sizeTag),
sizeof(
uint32_t));
710 m_cFile.write(cTag,sizeTag);
711 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>(cTag),sizeTag);
715 m_cFile.write(
" ",4-
ns);
716 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>(
" "),4-
ns);
723 ERS_DEBUG(2,
"Writing the metadata strings.");
726 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>
730 uint32_t nstrings = fmdStrings.size();
731 m_cFile.write(charAddress(nstrings),
sizeof(
uint32_t));
732 m_check = ::adler32(m_check, bytefAddress(nstrings),
735 vector<string>::const_iterator
it;
736 for(
it=fmdStrings.begin();
it!=fmdStrings.end(); ++
it) {
738 const char *cst =
it->c_str();
741 m_cFile.write(charAddress(slen),
sizeof(
uint32_t));
742 m_check = ::adler32(m_check,bytefAddress(slen),
sizeof(
uint32_t));
743 m_cFile.write(cst,slen);
744 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>(cst),slen);
747 m_cFile.write(
" ",4-
ns);
748 m_check = ::adler32(m_check,
reinterpret_cast<const Bytef*
>(
" "),4-
ns);
753 void DataWriter::registerCallBack(DataWriterCallBack *pUserCBclass) {
754 m_callBack = pUserCBclass;
755 if(m_callBack != NULL) {
756 m_callBack->FileWasOpened(
757 m_filenamecallback->getCurrentFileName(),
763 m_internal_run_parameters_record.run_number,
764 m_file_start_record.file_number,
770 void DataWriter::spaceForGuid() {
772 m_fmdStrings.insert(m_fmdStrings.begin(),
e);
775 void DataWriter::replaceGuid() {
778 if (m_next_guid ==
""){
780 m_guid =
g.toString();
782 m_guid = m_next_guid;
786 string e1=
"GUID=" + m_guid;
790 void DataWriter::setGuid(
const std::string&
Guid){
798 if (::stat64(fileNameCore.c_str(), &
tmp) == -1) {
799 EventStorage::WritingIssue ci(ERS_HERE,
"stat64 failed.");
810 return m_compression;