11 #include <boost/shared_ptr.hpp>
14 #include "EventStorage/DataWriterCallBack.h"
15 #include "EventStorage/FileNameCallback.h"
16 #include "EventStorage/RawFileName.h"
17 #include "EventStorage/EventStorageIssues.h"
18 #include "EventStorage/SimpleFileName.h"
21 #include <sys/types.h>
28 using EventStorage::DataWriterCallBack;
29 using EventStorage::DWError;
30 using EventStorage::FileNameCallback;
31 using EventStorage::SimpleFileName;
35 DataWriter(
const string& writingPath,
36 boost::shared_ptr<FileNameCallback> theFNCB,
42 const unsigned int lumiBlockNumber,
43 const std::string& applicationName,
44 const std::vector<std::string>& fmdStrings,
46 const unsigned int compLevel)
48 initDW(writingPath, theFNCB, rPar,
project,
streamType,
streamName,
stream, lumiBlockNumber, applicationName, fmdStrings,
compression, compLevel);
53 DataWriter(
const string& writingPath,
54 const string& fileNameCore,
57 const unsigned int startIndex,
59 const unsigned int compLevel)
61 boost::shared_ptr<daq::RawFileName>
62 fileName(
new daq::RawFileName(fileNameCore));
66 m_streamType =
fileName->streamType();
67 m_streamName =
fileName->streamName();
69 m_lumiBlockNumber =
fileName->lumiBlockNumber();
70 m_applicationName =
fileName->applicationName();
72 std::stringstream mystream;
73 mystream <<
"FileName " <<
fileName->fileName()
74 <<
" has failed interpretation."
75 <<
" The file header will have no values for "
76 <<
" project, stream, lumiBlock and appName";
77 ERS_DEBUG(1,mystream.str());
84 m_lumiBlockNumber = 0;
85 m_applicationName =
"";
88 fileName->setFileSequenceNumber(startIndex);
90 initDW(writingPath,
fileName, rPar, m_project, m_streamType, m_streamName,
91 m_stream, m_lumiBlockNumber, m_applicationName,
98 DataWriter(
const string& writingPath,
99 const string& fileNameCore,
104 const std::string&
stream,
105 const unsigned int lumiBlockNumber,
106 const std::string& applicationName,
107 const std::vector<std::string>& fmdStrings,
109 const unsigned int compLevel)
111 boost::shared_ptr<FileNameCallback>
fileName(
new SimpleFileName(fileNameCore));
112 initDW(writingPath,
fileName, rPar,
project,
streamType,
streamName,
stream, lumiBlockNumber, applicationName, fmdStrings,
compression, compLevel);
117 DataWriter::~DataWriter()
119 ERS_DEBUG(1,
"DataWriter::~DataWriter() called.");
121 m_file_end_record.status = 1;
122 if(m_cFileOpen) closeFile();
126 ERS_DEBUG(1,
"DataWriter::~DataWriter() finished.");
131 initDW(
const string& writingPath,
132 boost::shared_ptr<FileNameCallback> theFNCB,
137 const std::string&
stream,
138 const unsigned int lumiBlockNumber,
139 const std::string& applicationName,
140 const std::vector<std::string>& fmdStrings,
142 const unsigned int compLevel)
148 m_lumiBlockNumber = lumiBlockNumber;
149 m_applicationName = applicationName;
151 m_filenamecallback = theFNCB;
158 m_latestPosition = -1;
160 m_writePath = writingPath;
161 m_nextWritePath =
"";
164 m_file_name_strings.appName = m_applicationName;
166 m_file_name_strings.fileNameCore = m_filenamecallback->getCoreName();
168 setRunParamsRecord(rPar);
170 m_file_start_record.sizeLimit_MB = 0;
171 m_file_start_record.sizeLimit_dataBlocks = 0;
172 date_timeAsInt(m_file_start_record.date,m_file_start_record.time);
175 m_openFailed =
false;
179 m_fmdStrings = fmdStrings;
182 m_complevel = compLevel;
187 m_fmdStrings.push_back(
"Stream=" + m_stream );
188 m_fmdStrings.push_back(
"Project=" + m_project );
189 std::ostringstream o;
190 o << m_lumiBlockNumber;
191 m_fmdStrings.push_back(
"LumiBlock=" + o.str());
194 m_fmdStrings.push_back(offline_EventStorage_v5::compressiontag +
195 "=" + offline_EventStorage_v5::type_to_string(m_compression));
198 m_check = ::adler32(0L, Z_NULL, 0);
202 this->spaceForGuid();
203 this->openNextFile();
218 localtime_r( &a_time, &
t);
220 getDate= 1000000*
t.tm_mday+
233 if(m_openFailed)
return false;
237 return m_cFile.good();
248 DWError DataWriter::setMaxFileMB(
const unsigned int& maxFileMB)
250 m_file_start_record.sizeLimit_MB = maxFileMB;
251 return EventStorage::DWOK;
254 DWError DataWriter::setMaxFileNE(
const unsigned int& maxFileNE)
256 m_file_start_record.sizeLimit_dataBlocks = maxFileNE;
257 return EventStorage::DWOK;
264 m_internal_run_parameters_record.marker = rPar.
marker;
265 m_internal_run_parameters_record.record_size = rPar.
record_size;
266 m_internal_run_parameters_record.run_number = rPar.
run_number;
267 m_internal_run_parameters_record.max_events = rPar.
max_events;
268 m_internal_run_parameters_record.rec_enable = rPar.
rec_enable;
269 m_internal_run_parameters_record.trigger_type = rPar.
trigger_type;
270 m_internal_run_parameters_record.detector_mask_1of2 = rPar.
detector_mask & 0xFFFFFFFF;
271 m_internal_run_parameters_record.detector_mask_2of2 = (rPar.
detector_mask>> 32);
272 m_internal_run_parameters_record.beam_type = rPar.
beam_type;
273 m_internal_run_parameters_record.beam_energy = rPar.
beam_energy;
277 DWError DataWriter::putData(
const unsigned int&
dataSize,
const void *
event)
284 return this->putData(1, &
iov, todisk);
288 DWError DataWriter::putData(
const unsigned int&
entries,
const struct iovec * my_iovec)
291 return this->putData(
entries, my_iovec, todisk);
299 return this->putData(1, &
iov, sizeToDisk);
303 return this->putData_implementation(
entries, my_iovec, sizeToDisk);
307 DWError DataWriter::putPrecompressedData(
const unsigned int&
dataSize,
313 return this->putPrecompressedData(1, &
iov);
317 DWError DataWriter::putPrecompressedData(
const unsigned int&
entries,
321 return this->putData_implementation(
entries, my_iovec, todisk,
true);
325 DWError DataWriter::putData_implementation(
const unsigned int&
entries,
const iovec_const * my_iovec,
uint32_t& sizeToDisk,
bool precompressed){
327 ERS_DEBUG(3,
"DataWriter::putData called for an iovec.");
328 if(!m_cFileOpen) openNextFile();
330 if(!m_cFileOpen)
return EventStorage::DWNOOK;
332 if(!m_cFile.good())
return EventStorage::DWNOOK;
346 if(m_file_start_record.sizeLimit_MB>0) {
347 double eventMB=
static_cast<double>(
dataSize)/(1024*1024);
348 double dataMB=m_cFileMB+eventMB;
349 if(
static_cast<unsigned int>(dataMB+0.5) >
350 m_file_start_record.sizeLimit_MB) {
351 ERS_DEBUG(3,
"Approaching file size limit in MB.");
356 if((m_file_start_record.sizeLimit_dataBlocks>0) &&
357 (m_file_end_record.events_in_file >= m_file_start_record.sizeLimit_dataBlocks)){
358 ERS_DEBUG(3,
"Approaching file size limit in number of events.");
364 if(m_compression ==
ZLIB && !precompressed){
366 offline_EventStorage_v5::zlibcompress(*m_compressed, sizeToDisk,
368 }
catch(offline_EventStorage_v5::CompressionIssue& ex){
369 EventStorage::WritingIssue ci(ERS_HERE,
"Data compression failed.", ex);
371 return EventStorage::DWNOOK;
381 m_latestPosition = m_cFile.tellg();
387 if(m_compression !=
ZLIB || precompressed){
389 m_cFile.write(
static_cast<const char*
>(my_iovec[
i].iov_base),
390 my_iovec[
i].iov_len);
392 m_check = ::adler32(m_check,
393 static_cast<const Bytef *
>(my_iovec[
i].iov_base),
394 my_iovec[
i].iov_len);
397 m_cFile.write(
static_cast<const char *
>(m_compressed->handle()), sizeToDisk);
398 m_check = ::adler32(m_check,
399 static_cast<const Bytef *
>(m_compressed->handle()),
404 m_cFileMB +=
static_cast<double>(sizeToDisk)/(1024*1024);
405 m_runMB +=
static_cast<double>(sizeToDisk)/(1024*1024);
406 m_file_end_record.events_in_file++;
407 m_file_end_record.events_in_run++;
409 ERS_DEBUG(3,
"Event "<< m_file_end_record.events_in_run<<
" is written at offset "<<m_latestPosition);
411 if(!m_cFile.good())
return EventStorage::DWNOOK;
413 return EventStorage::DWOK;
419 return m_latestPosition;
423 unsigned int DataWriter::eventsInFile()
const
425 return m_file_end_record.events_in_file;
428 unsigned int DataWriter::eventsInFileSequence()
const
430 return m_file_end_record.events_in_run;
433 unsigned int DataWriter::dataMB_InFile()
const
435 return static_cast<unsigned int>(m_cFileMB+0.5);
438 unsigned int DataWriter::dataMB_InFileSequence()
const
440 return static_cast<unsigned int>(m_runMB+0.5);
444 DWError DataWriter::closeFile()
446 ERS_DEBUG(3,
"DataWriter::closeFile() called.");
449 m_file_end_record.data_in_file =
static_cast<unsigned int>(m_cFileMB+0.5);
450 m_file_end_record.data_in_run =
static_cast<unsigned int>(m_runMB+0.5);
452 date_timeAsInt(m_file_end_record.date,m_file_end_record.time);
458 std::ostringstream oss;
461 oss << std::hex << std::uppercase <<m_check;
462 std::string checksum = oss.str();
463 m_check = ::adler32(0L, Z_NULL, 0);
470 EventStorage::WritingIssue ci(ERS_HERE,
"Rename failed.");
472 return EventStorage::DWNOOK;
479 if(m_callBack != NULL) {
480 ERS_DEBUG(3,
"Execute callback from DataWriter.");
481 m_callBack->FileWasClosed(
482 m_filenamecallback->getCurrentFileName(),
489 m_file_end_record.events_in_file,
490 m_internal_run_parameters_record.run_number,
491 m_file_start_record.file_number,
498 if(m_nextWritePath !=
"") {
499 m_writePath=m_nextWritePath;
501 ERS_DEBUG(3,
"New writing path became effective. The path is "
507 m_filenamecallback->advance();
508 }
catch(EventStorage::ES_SingleFile &
e){
512 return EventStorage::DWOK;
514 return EventStorage::DWNOOK;
518 DWError DataWriter::nextFile()
520 ERS_DEBUG(2,
"DataWriter::nextFile() called.");
522 DWError we=closeFile();
531 if(m_openFailed) we=EventStorage::DWNOOK;
537 ifstream
test(
name.c_str(),ios::binary | ios::in);
538 bool isThere =
test.good();
544 void DataWriter::openNextFile()
547 m_file_end_record.events_in_file=0;
550 m_file_start_record.file_number = m_filenamecallback->getIndex();
551 }
catch(EventStorage::ES_SingleFile &
e){
552 m_file_start_record.file_number++;
554 date_timeAsInt(m_file_start_record.date,m_file_start_record.time);
564 EventStorage::WritingIssue ci(ERS_HERE,
err.c_str());
570 EventStorage::WritingIssue ci(ERS_HERE,
err.c_str());
573 m_filenamecallback->fileAlreadyExists();
575 m_file_start_record.file_number = m_filenamecallback->getIndex();
578 oss <<
"increase file number to ";
579 oss << m_file_start_record.file_number;
580 EventStorage::WritingIssue ci(ERS_HERE, oss.str().c_str());
583 ERS_DEBUG(2,
"OK to write file with number "<<m_file_start_record.file_number);
591 m_check = ::adler32(0L, Z_NULL, 0);
594 file_record(m_file_name_strings);
598 ERS_DEBUG(3,
"Writing "<<m_fmdStrings.size()<<
" metadata strings.");
599 if(m_fmdStrings.size()>0) file_record(m_fmdStrings);
601 ERS_DEBUG(3,
"Status of the file. good() returns "<<m_cFile.good());
602 m_cFileOpen = m_cFile.good();
603 m_openFailed = !m_cFile.good();
605 if((m_callBack != NULL) && m_cFileOpen) {
606 ERS_DEBUG(3,
"Execute callback from DataWriter.");
607 m_callBack->FileWasOpened(
608 m_filenamecallback->getCurrentFileName(),
614 m_internal_run_parameters_record.run_number,
615 m_file_start_record.file_number,
624 n << m_writePath <<
"/";
625 n << m_filenamecallback->getCurrentFileName((
FINISHED ==
fs)?
false:
true);
626 string name =
n.str();
631 m_nextWritePath = writingPath;
632 ERS_DEBUG(3,
"Directory changed to "<<m_nextWritePath
633 <<
" (effective for hext file).");
636 bool DataWriter::inTransition()
const {
639 if((m_nextWritePath !=
"") &&
640 (m_nextWritePath != m_writePath)) inT=
true;
645 void DataWriter::file_record(
void *ri,
const void *
pi) {
656 m_cFile.write((
char *)(record+
i),
sizeof(
uint32_t));
657 m_check = ::adler32(m_check,(
const Bytef*)(record+
i),
sizeof(
uint32_t));
665 const char *cName = nst.
appName.c_str();
672 m_check = ::adler32(m_check,(
const Bytef*)
676 m_cFile.write((
char *)(&sizeName),
sizeof(
uint32_t));
677 m_check = ::adler32(m_check,(
const Bytef*)
680 m_cFile.write(cName,sizeName);
681 m_check = ::adler32(m_check,(
const Bytef*)
685 char ns = sizeName % 4;
687 m_cFile.write(
" ",4-
ns);
688 m_check = ::adler32(m_check,(
const Bytef*)
" ",4-
ns);
691 m_cFile.write((
char *)(&sizeTag),
sizeof(
uint32_t));
692 m_check = ::adler32(m_check,(
const Bytef*)(&sizeTag),
sizeof(
uint32_t));
693 m_cFile.write(cTag,sizeTag);
694 m_check = ::adler32(m_check,(
const Bytef*)cTag,sizeTag);
698 m_cFile.write(
" ",4-
ns);
699 m_check = ::adler32(m_check,(
const Bytef*)
" ",4-
ns);
706 ERS_DEBUG(2,
"Writing the metadata strings.");
709 m_check = ::adler32(m_check, (
const Bytef*)
713 uint32_t nstrings = fmdStrings.size();
714 m_cFile.write((
char *)(&nstrings),
sizeof(
uint32_t));
715 m_check = ::adler32(m_check, (
const Bytef*)
719 vector<string>::const_iterator
it;
720 for(
it=fmdStrings.begin();
it!=fmdStrings.end(); ++
it) {
722 const char *cst =
it->c_str();
725 m_cFile.write((
char *)(&slen),
sizeof(
uint32_t));
726 m_check = ::adler32(m_check,(
const Bytef*)(&slen),
sizeof(
uint32_t));
727 m_cFile.write(cst,slen);
728 m_check = ::adler32(m_check,(
const Bytef*)cst,slen);
731 m_cFile.write(
" ",4-
ns);
732 m_check = ::adler32(m_check,(
const Bytef*)
" ",4-
ns);
737 void DataWriter::registerCallBack(DataWriterCallBack *pUserCBclass) {
738 m_callBack = pUserCBclass;
739 if(m_callBack != NULL) {
740 m_callBack->FileWasOpened(
741 m_filenamecallback->getCurrentFileName(),
747 m_internal_run_parameters_record.run_number,
748 m_file_start_record.file_number,
754 void DataWriter::spaceForGuid() {
756 m_fmdStrings.insert(m_fmdStrings.begin(),
e);
759 void DataWriter::replaceGuid() {
762 if (m_next_guid ==
""){
764 m_guid =
g.toString();
766 m_guid = m_next_guid;
770 string e1=
"GUID=" + m_guid;
774 void DataWriter::setGuid(
const std::string&
Guid){
782 if (::stat64(fileNameCore.c_str(), &
tmp) == -1) {
783 EventStorage::WritingIssue ci(ERS_HERE,
"stat64 failed.");
794 return m_compression;