ATLAS Offline Software
ClusterMessage.cxx
Go to the documentation of this file.
1 /*
2  Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
3 */
5 #include <cstdint>
6 
8  : ptr(rhs.ptr),
9  len(rhs.len),
10  align(rhs.align),
11  received(rhs.received),
12  evtNumber(rhs.evtNumber),
13  fileNumber(rhs.fileNumber) {
14  rhs.ptr = nullptr;
15  rhs.len = 0;
16  rhs.align = 0;
17  rhs.received = false;
18 }
20  : ptr(reinterpret_cast<void*>((std::uint64_t(body[0]) << 32) +
21  std::uint64_t(body[1]))),
22  len((std::uint64_t(body[2]) << 32) + std::uint64_t(body[3])),
23  align((std::uint64_t(body[4]) << 32) + std::uint64_t(body[5])),
24  received(true),
25  evtNumber((std::uint64_t(body[6]) << 32) + std::uint64_t(body[7])),
26  fileNumber((std::uint64_t(body[8]) << 32) + std::uint64_t(body[9])) {}
27 
29  if (received && (ptr != nullptr)) {
30  std::free(ptr);
31  }
32 }
33 
35  DataDescr&& rhs) noexcept {
36  ptr = rhs.ptr;
37  len = rhs.len;
38  align = rhs.align;
39  received = rhs.received;
40  evtNumber = rhs.evtNumber;
41  fileNumber = rhs.fileNumber;
42  rhs.ptr = nullptr;
43  rhs.len = 0;
44  rhs.align = 0;
45  rhs.received = false;
46  rhs.evtNumber = 0;
47  rhs.fileNumber = 0;
48  return *this;
49 }
50 
52  : messageType(mType), payload(payload) {
55  throw std::logic_error{std::format(
56  "Incorrect ClusterMessage constructor used for message type {}",
57  mType)};
58  }
59 }
60 
62  : messageType(mType), payload(payload) {
63  if (mType != ClusterMessageType::ProvideEvent) {
64  throw std::logic_error{std::format(
65  "Incorrect ClusterMessage constructor used for message type {}",
66  mType)};
67  }
68 }
69 
71  : messageType(mType), payload(std::move(payload)) {
72  if (mType != ClusterMessageType::Data) {
73  throw std::logic_error{std::format(
74  "Incorrect ClusterMessage constructor used for message type {}",
75  mType)};
76  }
77 }
78 
80  switch (mType) {
84  // OK
85  break;
86  default:
87  throw std::logic_error{std::format(
88  "Incorrect ClusterMessage constructor used for message type {}",
89  mType)};
90  }
91 }
92 
94 
96  const auto& [header, body] = wire_msg;
97  messageType = static_cast<ClusterMessageType>(header[0]);
98  source = header[1];
99  if (body.has_value()) {
100  const auto& body_2 = *body;
102  payload = DataDescr(body_2);
103  } else {
105  status.status = StatusCode(body_2[0]);
106  status.createdEvents = body_2[1];
107  status.finishedEvents = body_2[2];
108  status.skippedEvents = body_2[3];
109  payload = status;
110  }
111  } else {
112  if (header[2] >= 0) {
113  payload = header[2];
114  }
115  }
116 }
117 
119  constexpr int max_tag = 16383;
120  constexpr std::uint64_t lower32 = 0xFFFFFFFF;
121 
122  static thread_local int next_msg =
123  1; // This is only ever called from one thread per process
124  WireMsgHdr header{};
125  header[0] = int(messageType);
126  header[1] = source;
127  if (payload.index() == 3) {
128  next_msg = (next_msg % max_tag) + 1;
129  header[2] = next_msg;
130  WireMsgBody body{};
131  const auto& payload_local = std::get<DataDescr>(payload);
132  body[0] = int(std::uint64_t(payload_local.ptr) >> 32);
133  body[1] = int(std::uint64_t(payload_local.ptr) & lower32);
134  body[2] = int(std::uint64_t(payload_local.len) >> 32);
135  body[3] = int(std::uint64_t(payload_local.len) & lower32);
136  body[4] = int(std::uint64_t(payload_local.align) >> 32);
137  body[5] = int(std::uint64_t(payload_local.align) & lower32);
138  body[6] = int(std::uint64_t(payload_local.evtNumber) >> 32);
139  body[7] = int(std::uint64_t(payload_local.evtNumber) & lower32);
140  body[8] = int(std::uint64_t(payload_local.fileNumber) >> 32);
141  body[9] = int(std::uint64_t(payload_local.fileNumber) & lower32);
142  WireMsg msg{header, std::make_optional(body)};
143  return msg;
144  }
145  if (payload.index() == 2) {
146  next_msg = (next_msg % max_tag) + 1;
147  header[2] = next_msg;
148  WireMsgBody body{};
149  const auto& payload_local = std::get<WorkerStatus>(payload);
150  body[0] = static_cast<int>(payload_local.status.getCode());
151  body[1] = payload_local.createdEvents;
152  body[2] = payload_local.finishedEvents;
153  body[3] = payload_local.skippedEvents;
154  body[4] = body[5] = body[6] = body[7] = body[8] = body[9] = 0;
155  WireMsg msg{header, std::make_optional(body)};
156  return msg;
157  }
158  // else
159  if (payload.index() == 1) { // if we have an int payload
160  header[2] = std::get<int>(payload);
161  } else {
162  header[2] = -1;
163  }
164  WireMsg msg{header, std::nullopt};
165  return msg;
166 }
ClusterMessage::payload
Payload_t payload
Definition: ClusterMessage.h:78
ClusterMessageType::Data
@ Data
header
Definition: hcg.cxx:526
vtune_athena.format
format
Definition: vtune_athena.py:14
ClusterMessage::WorkerStatus
Definition: ClusterMessage.h:43
ClusterMessage::DataDescr::DataDescr
DataDescr(const T *ptr, std::size_t count=1)
Definition: ClusterMessage.h:61
ClusterMessageType::RequestEvent
@ RequestEvent
dbg::ptr
void * ptr(T *p)
Definition: SGImplSvc.cxx:74
ClusterMessageType
An enum class defining what type of message this is.
ClusterMessage::ClusterMessage
ClusterMessage()
ClusterMessageType::EmergencyStop
@ EmergencyStop
ClusterMessage::WireMsgBody
std::array< int, 10 > WireMsgBody
Definition: ClusterMessage.h:37
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
ClusterMessage::DataDescr::operator=
DataDescr & operator=(DataDescr &&rhs) noexcept
Definition: ClusterMessage.cxx:34
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ClusterMessageType::FinalWorkerStatus
@ FinalWorkerStatus
ClusterMessage::wire_msg
WireMsg wire_msg() const
Definition: ClusterMessage.cxx:118
ClusterMessage::messageType
ClusterMessageType messageType
Definition: ClusterMessage.h:41
ClusterMessage::DataDescr::~DataDescr
~DataDescr()
Definition: ClusterMessage.cxx:28
ClusterMessage::source
int source
Definition: ClusterMessage.h:40
ClusterMessageType::WorkerError
@ WorkerError
ClusterMessage::DataDescr
Definition: ClusterMessage.h:50
PixelModuleFeMask_create_db.payload
string payload
Definition: PixelModuleFeMask_create_db.py:69
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ClusterMessage::WireMsgHdr
std::array< int, 3 > WireMsgHdr
Definition: ClusterMessage.h:36
ClusterMessageType::EventsDone
@ EventsDone
ClusterMessage::WireMsg
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
Definition: ClusterMessage.h:38
merge.status
status
Definition: merge.py:16
makeTOC.header
header
Definition: makeTOC.py:28
ClusterMessageType::ProvideEvent
@ ProvideEvent
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7
ClusterMessage.h