ATLAS Offline Software
Loading...
Searching...
No Matches
ClusterMessage.cxx
Go to the documentation of this file.
1/*
2 Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
3*/
5#include <cstdint>
6
8 : ptr(rhs.ptr),
9 len(rhs.len),
10 align(rhs.align),
11 received(rhs.received),
12 evtNumber(rhs.evtNumber),
13 fileNumber(rhs.fileNumber) {
14 rhs.ptr = nullptr;
15 rhs.len = 0;
16 rhs.align = 0;
17 rhs.received = false;
18}
20 : ptr(reinterpret_cast<void*>((std::uint64_t(body[0]) << 32) +
21 std::uint64_t(body[1]))),
22 len((std::uint64_t(body[2]) << 32) + std::uint64_t(body[3])),
23 align((std::uint64_t(body[4]) << 32) + std::uint64_t(body[5])),
24 received(true),
25 evtNumber((std::uint64_t(body[6]) << 32) + std::uint64_t(body[7])),
26 fileNumber((std::uint64_t(body[8]) << 32) + std::uint64_t(body[9])) {}
27
29 if (received) {
30 std::free(ptr);
31 }
32}
33
35 DataDescr&& rhs) noexcept {
36 if (received) {
37 std::free(ptr); //release the object memory before assigning a new one
38 }
39 ptr = rhs.ptr;
40 len = rhs.len;
41 align = rhs.align;
42 received = rhs.received;
43 evtNumber = rhs.evtNumber;
44 fileNumber = rhs.fileNumber;
45 rhs.ptr = nullptr;
46 rhs.len = 0;
47 rhs.align = 0;
48 rhs.received = false;
49 rhs.evtNumber = 0;
50 rhs.fileNumber = 0;
51 return *this;
52}
53
55 : messageType(mType), payload(payload) {
56 if (mType != ClusterMessageType::FinalWorkerStatus &&
57 mType != ClusterMessageType::WorkerError) {
58 throw std::logic_error{std::format(
59 "Incorrect ClusterMessage constructor used for message type {}",
60 mType)};
61 }
62}
63
65 : messageType(mType), payload(payload) {
66 if (mType != ClusterMessageType::ProvideEvent) {
67 throw std::logic_error{std::format(
68 "Incorrect ClusterMessage constructor used for message type {}",
69 mType)};
70 }
71}
72
74 : messageType(mType), payload(std::move(payload)) {
75 if (mType != ClusterMessageType::Data) {
76 throw std::logic_error{std::format(
77 "Incorrect ClusterMessage constructor used for message type {}",
78 mType)};
79 }
80}
81
83 switch (mType) {
84 case ClusterMessageType::RequestEvent:
85 case ClusterMessageType::EventsDone:
86 case ClusterMessageType::EmergencyStop:
87 // OK
88 break;
89 default:
90 throw std::logic_error{std::format(
91 "Incorrect ClusterMessage constructor used for message type {}",
92 mType)};
93 }
94}
95
97
99 const auto& [header, body] = wire_msg;
100 messageType = static_cast<ClusterMessageType>(header[0]);
101 source = header[1];
102 if (body.has_value()) {
103 const auto& body_2 = *body;
104 if (messageType == ClusterMessageType::Data) {
105 payload = DataDescr(body_2);
106 } else {
107 WorkerStatus status{};
108 status.status = StatusCode(body_2[0]);
109 status.createdEvents = body_2[1];
110 status.finishedEvents = body_2[2];
111 status.skippedEvents = body_2[3];
112 payload = status;
113 }
114 } else {
115 if (header[2] >= 0) {
116 payload = header[2];
117 }
118 }
119}
120
122 constexpr int max_tag = 16383;
123 constexpr std::uint64_t lower32 = 0xFFFFFFFF;
124
125 static thread_local int next_msg =
126 1; // This is only ever called from one thread per process
128 header[0] = int(messageType);
129 header[1] = source;
130 if (payload.index() == 3) {
131 next_msg = (next_msg % max_tag) + 1;
132 header[2] = next_msg;
133 WireMsgBody body{};
134 const auto& payload_local = std::get<DataDescr>(payload);
135 body[0] = int(std::uint64_t(payload_local.ptr) >> 32);
136 body[1] = int(std::uint64_t(payload_local.ptr) & lower32);
137 body[2] = int(std::uint64_t(payload_local.len) >> 32);
138 body[3] = int(std::uint64_t(payload_local.len) & lower32);
139 body[4] = int(std::uint64_t(payload_local.align) >> 32);
140 body[5] = int(std::uint64_t(payload_local.align) & lower32);
141 body[6] = int(std::uint64_t(payload_local.evtNumber) >> 32);
142 body[7] = int(std::uint64_t(payload_local.evtNumber) & lower32);
143 body[8] = int(std::uint64_t(payload_local.fileNumber) >> 32);
144 body[9] = int(std::uint64_t(payload_local.fileNumber) & lower32);
145 WireMsg msg{header, std::make_optional(body)};
146 return msg;
147 }
148 if (payload.index() == 2) {
149 next_msg = (next_msg % max_tag) + 1;
150 header[2] = next_msg;
151 WireMsgBody body{};
152 const auto& payload_local = std::get<WorkerStatus>(payload);
153 body[0] = static_cast<int>(payload_local.status.getCode());
154 body[1] = payload_local.createdEvents;
155 body[2] = payload_local.finishedEvents;
156 body[3] = payload_local.skippedEvents;
157 body[4] = body[5] = body[6] = body[7] = body[8] = body[9] = 0;
158 WireMsg msg{header, std::make_optional(body)};
159 return msg;
160 }
161 // else
162 if (payload.index() == 1) { // if we have an int payload
163 header[2] = std::get<int>(payload);
164 } else {
165 header[2] = -1;
166 }
167 WireMsg msg{header, std::nullopt};
168 return msg;
169}
ClusterMessageType
STL namespace.
DataDescr(const T *ptr, std::size_t count=1)
DataDescr & operator=(const DataDescr &)=delete
ClusterMessageType messageType
Payload_t payload
std::array< int, 10 > WireMsgBody
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
WireMsg wire_msg() const
std::array< int, 3 > WireMsgHdr
MsgStream & msg
Definition testRead.cxx:32