ATLAS Offline Software
ByteStreamMergeOutputSvc.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
4 
7 
8 #include "GaudiKernel/GenericAddress.h"
9 #include "GaudiKernel/MsgStream.h"
10 #include "GaudiKernel/ISvcLocator.h"
11 
12 #include "eformat/SourceIdentifier.h"
13 
14 #include <map>
15 
17 typedef std::map<uint32_t, ROBF*> ROBMAP;
18 
19 // Constructor.
20 ByteStreamMergeOutputSvc::ByteStreamMergeOutputSvc(const std::string& name, ISvcLocator* svcloc) :
21  base_class(name,svcloc)
22 {
23  // cppcheck-suppress useInitializationList; deprecated Property constructor
25 }
26 
27 // Destructor.
29 }
30 
31 // setup input and output paths
33  ATH_CHECK( m_outSvc.retrieve() );
34  ATH_CHECK( m_inSvc.retrieve() );
35 
36  return(StatusCode::SUCCESS);
37 }
38 
39 // ROBs from the L2 and event filter needs to be treated specially since the module id
40 // is used online to identify which node processed an event and it therefore varies from
41 // event to event - here we remap it to have module id 0, since there should only be one per event
43  eformat::helper::SourceIdentifier id = eformat::helper::SourceIdentifier(source_id);
44  if (id.subdetector_id() == eformat::TDAQ_LVL2 || id.subdetector_id() == eformat::TDAQ_EVENT_FILTER) {
45  return(eformat::helper::SourceIdentifier(id.subdetector_id(), 0).code());
46  } else {
47  return(source_id);
48  }
49 }
50 
51 // Read the next event.
53  // get original event
54  const RawEvent* orgEvent = m_inSvc->currentEvent();
55 
56  ATH_MSG_DEBUG("original BS size = " << 4 * orgEvent->fragment_size_word());
57  ATH_MSG_DEBUG("athena BS size = " << 4 * newEvent->fragment_size_word());
58 
59  // do the merge...
60  // get all the ROBFragments
61  const size_t MAX_ROBFRAGMENTS = 2048;
62  std::vector<OFFLINE_FRAGMENTS_NAMESPACE::PointerType> orgRobF(MAX_ROBFRAGMENTS);
63  std::vector<OFFLINE_FRAGMENTS_NAMESPACE::PointerType> newRobF(MAX_ROBFRAGMENTS);
64  size_t orgrobcount = orgEvent->children(orgRobF.data(), MAX_ROBFRAGMENTS);
65  if (orgrobcount == MAX_ROBFRAGMENTS) {
66  ATH_MSG_ERROR("ROB buffer overflow");
67  return false;
68  }
69  size_t newrobcount = newEvent->children(newRobF.data(),MAX_ROBFRAGMENTS);
70  if (newrobcount == MAX_ROBFRAGMENTS) {
71  ATH_MSG_ERROR("ROB buffer overflow");
72  return false;
73  }
74 
75  ROBMAP robsToAdd;
76  // loop over all ROBs
77  for (size_t irob = 0; irob < orgrobcount; ++irob) {
78  ROBF* rob = new ROBF(orgRobF[irob]);
79  robsToAdd[reducedROBid(rob->source_id())] = rob;
80  ATH_MSG_DEBUG("original ROBFragment, src ID = " << std::hex << rob->source_id());
81  }
82  // now add/overwrite with newly created robs
83  for (size_t irob = 0; irob < newrobcount; ++irob) {
84  ROBF* rob = new ROBF(newRobF[irob]);
85  ROBMAP::const_iterator it = robsToAdd.find(reducedROBid(rob->source_id()));
86  if (it != robsToAdd.end()) {
87  delete it->second;
88  ATH_MSG_DEBUG("overwriting ROBFragment with src ID = " << std::hex << rob->source_id());
89  }
90  robsToAdd[reducedROBid(rob->source_id())] = rob;
91  ATH_MSG_DEBUG("new ROBFragment, src ID = " << std::hex << rob->source_id());
92  }
93  RawEventWrite* mergedEventWrite = new RawEventWrite();
94  // copy header
95  const RawEvent *event = orgEvent;
96  if (m_overwriteHeader) {
97  event = newEvent;
98  }
99  mergedEventWrite->source_id(event->source_id());
100  mergedEventWrite->bc_time_seconds(event->bc_time_seconds());
101  mergedEventWrite->bc_time_nanoseconds(event->bc_time_nanoseconds());
102  mergedEventWrite->global_id(event->global_id());
103  mergedEventWrite->run_type(event->run_type());
104  mergedEventWrite->run_no(event->run_no());
105  mergedEventWrite->lumi_block(event->lumi_block());
106  mergedEventWrite->lvl1_id(event->lvl1_id());
107  mergedEventWrite->bc_id(event->bc_id());
108  mergedEventWrite->lvl1_trigger_type(event->lvl1_trigger_type());
110  event->status(tmp);
111  mergedEventWrite->status(event->nstatus(), tmp);
112  event->lvl1_trigger_info(tmp);
113  mergedEventWrite->lvl1_trigger_info(event->nlvl1_trigger_info(), tmp);
114  event->lvl2_trigger_info(tmp);
115  mergedEventWrite->lvl2_trigger_info(event->nlvl2_trigger_info(), tmp);
116  event->event_filter_info(tmp);
117  mergedEventWrite->event_filter_info(event->nevent_filter_info(), tmp);
118  event->stream_tag(tmp);
119  mergedEventWrite->stream_tag(event->nstream_tag(), tmp);
120  mergedEventWrite->checksum_type(event->checksum_type());
121  // copy robs
122  for(ROBMAP::iterator it = robsToAdd.begin(), itEnd = robsToAdd.end(); it != itEnd; ++it) {
123  mergedEventWrite->append(it->second);
124  }
125  // convert RawEventWrite to RawEvent
126  uint32_t rawSize = mergedEventWrite->size_word();
128  uint32_t count = eformat::write::copy(*(mergedEventWrite->bind()), buffer, rawSize);
129  if (count != rawSize) {
130  ATH_MSG_ERROR("Memcopy failed " << count << " " << rawSize);
131  return false;
132  }
133  RawEvent newRawEvent(buffer);
134  StatusCode sc = m_outSvc->putEvent(&newRawEvent) ? StatusCode::SUCCESS : StatusCode::FAILURE;
135  for(ROBMAP::iterator it = robsToAdd.begin(), itEnd = robsToAdd.end(); it != itEnd; ++it) {
136  delete it->second; it->second = 0;
137  }
138  delete mergedEventWrite; mergedEventWrite = 0;
139  delete [] buffer;
140  if (sc != StatusCode::SUCCESS) {
141  ATH_MSG_ERROR("Failed to put RawEvent");
142  }
143  return(true);
144 }
145 
146 bool ByteStreamMergeOutputSvc::putEvent(const RawEvent* /*re*/, const EventContext& /*ctx*/) {
147  ATH_MSG_FATAL(name() << " does not implement the context-aware putEvent method");
148  return false;
149 }
xAOD::iterator
JetConstituentVector::iterator iterator
Definition: JetConstituentVector.cxx:68
RawEventWrite
OFFLINE_FRAGMENTS_NAMESPACE_WRITE::FullEventFragment RawEventWrite
data type for writing raw event
Definition: RawEvent.h:39
ATH_MSG_FATAL
#define ATH_MSG_FATAL(x)
Definition: AthMsgStreamMacros.h:34
OFFLINE_FRAGMENTS_NAMESPACE::DataType
uint32_t DataType
Definition: RawEvent.h:24
xAOD::uint32_t
setEventNumber uint32_t
Definition: EventInfo_v1.cxx:127
ByteStreamMergeOutputSvc::m_outSvc
ServiceHandle< IByteStreamOutputSvc > m_outSvc
Definition: ByteStreamMergeOutputSvc.h:39
RawEvent
OFFLINE_FRAGMENTS_NAMESPACE::FullEventFragment RawEvent
data type for reading raw event
Definition: RawEvent.h:37
skel.it
it
Definition: skel.GENtoEVGEN.py:407
XMLtoHeader.count
count
Definition: XMLtoHeader.py:84
AthenaPoolTestRead.sc
sc
Definition: AthenaPoolTestRead.py:27
ByteStreamMergeOutputSvc::putEvent
virtual bool putEvent(const RawEvent *re) override
Implementation of the IByteStreamOutputSvc interface methods.
Definition: ByteStreamMergeOutputSvc.cxx:52
ByteStreamMergeOutputSvc::ByteStreamMergeOutputSvc
ByteStreamMergeOutputSvc(const std::string &name, ISvcLocator *svcloc)
Constructors:
Definition: ByteStreamMergeOutputSvc.cxx:20
ROBF
OFFLINE_FRAGMENTS_NAMESPACE_WRITE::ROBFragment ROBF
Definition: ByteStreamMergeOutputSvc.cxx:16
ByteStreamMergeOutputSvc::m_inSvc
ServiceHandle< IByteStreamInputSvc > m_inSvc
Definition: ByteStreamMergeOutputSvc.h:38
ByteStreamMergeOutputSvc.h
This file contains the class definition for the ByteStreamMergeOutputSvc class.
histSizes.code
code
Definition: histSizes.py:129
createCoolChannelIdFile.buffer
buffer
Definition: createCoolChannelIdFile.py:11
ATH_MSG_ERROR
#define ATH_MSG_ERROR(x)
Definition: AthMsgStreamMacros.h:33
event
POOL::TEvent event(POOL::TEvent::kClassAccess)
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ATH_MSG_DEBUG
#define ATH_MSG_DEBUG(x)
Definition: AthMsgStreamMacros.h:29
ByteStreamMergeOutputSvc::initialize
virtual StatusCode initialize() override
Definition: ByteStreamMergeOutputSvc.cxx:32
ByteStreamMergeOutputSvc::~ByteStreamMergeOutputSvc
virtual ~ByteStreamMergeOutputSvc()
Destructor.
Definition: ByteStreamMergeOutputSvc.cxx:28
ByteStreamMergeOutputSvc::reducedROBid
uint32_t reducedROBid(uint32_t)
Definition: ByteStreamMergeOutputSvc.cxx:42
ByteStreamMergeOutputSvc::m_bsOutputStreamName
Gaudi::Property< std::string > m_bsOutputStreamName
Definition: ByteStreamMergeOutputSvc.h:41
ByteStreamMergeOutputSvc::m_overwriteHeader
Gaudi::Property< bool > m_overwriteHeader
Definition: ByteStreamMergeOutputSvc.h:42
ATH_CHECK
#define ATH_CHECK
Definition: AthCheckMacros.h:40
DeMoUpdate.tmp
string tmp
Definition: DeMoUpdate.py:1167
name
std::string name
Definition: Control/AthContainers/Root/debug.cxx:240
ByteStream.h
OFFLINE_FRAGMENTS_NAMESPACE_WRITE::ROBFragment
eformat::write::ROBFragment ROBFragment
Definition: RawEvent.h:33
calibdata.copy
bool copy
Definition: calibdata.py:26
ROBMAP
std::map< uint32_t, ROBF * > ROBMAP
Definition: ByteStreamMergeOutputSvc.cxx:17