ATLAS Offline Software
Loading...
Searching...
No Matches
ClusterMessage Class Reference

A class describing a message sent between nodes in a cluster. More...

#include <ClusterMessage.h>

Collaboration diagram for ClusterMessage:

Classes

struct  DataDescr
struct  WorkerStatus

Public Types

using WireMsgHdr = std::array<int, 3>
using WireMsgBody = std::array<int, 10>
using WireMsg = std::tuple<WireMsgHdr, std::optional<WireMsgBody>>
using Payload_t = std::variant<std::monostate, int, WorkerStatus, DataDescr>

Public Member Functions

 ClusterMessage ()
 ClusterMessage (ClusterMessageType mType)
 ClusterMessage (ClusterMessageType mType, int payload)
 ClusterMessage (ClusterMessageType mType, WorkerStatus payload)
 ClusterMessage (ClusterMessageType mType, DataDescr &&payload)
 ClusterMessage (const WireMsg &)
WireMsg wire_msg () const

Public Attributes

int source = -1
ClusterMessageType messageType {ClusterMessageType::EMPTY}
Payload_t payload {}

Detailed Description

A class describing a message sent between nodes in a cluster.

Definition at line 30 of file ClusterMessage.h.

Member Typedef Documentation

◆ Payload_t

using ClusterMessage::Payload_t = std::variant<std::monostate, int, WorkerStatus, DataDescr>

Definition at line 80 of file ClusterMessage.h.

◆ WireMsg

using ClusterMessage::WireMsg = std::tuple<WireMsgHdr, std::optional<WireMsgBody>>

Definition at line 38 of file ClusterMessage.h.

◆ WireMsgBody

using ClusterMessage::WireMsgBody = std::array<int, 10>

Definition at line 37 of file ClusterMessage.h.

◆ WireMsgHdr

using ClusterMessage::WireMsgHdr = std::array<int, 3>

Definition at line 36 of file ClusterMessage.h.

Constructor & Destructor Documentation

◆ ClusterMessage() [1/6]

ClusterMessage::ClusterMessage ( )
default

◆ ClusterMessage() [2/6]

ClusterMessage::ClusterMessage ( ClusterMessageType mType)

Definition at line 82 of file ClusterMessage.cxx.

82 : messageType(mType) {
83 switch (mType) {
84 case ClusterMessageType::RequestEvent:
85 case ClusterMessageType::EventsDone:
86 case ClusterMessageType::EmergencyStop:
87 // OK
88 break;
89 default:
90 throw std::logic_error{std::format(
91 "Incorrect ClusterMessage constructor used for message type {}",
92 mType)};
93 }
94}
ClusterMessageType messageType

◆ ClusterMessage() [3/6]

ClusterMessage::ClusterMessage ( ClusterMessageType mType,
int payload )

Definition at line 64 of file ClusterMessage.cxx.

65 : messageType(mType), payload(payload) {
66 if (mType != ClusterMessageType::ProvideEvent) {
67 throw std::logic_error{std::format(
68 "Incorrect ClusterMessage constructor used for message type {}",
69 mType)};
70 }
71}
Payload_t payload

◆ ClusterMessage() [4/6]

ClusterMessage::ClusterMessage ( ClusterMessageType mType,
WorkerStatus payload )

Definition at line 54 of file ClusterMessage.cxx.

55 : messageType(mType), payload(payload) {
56 if (mType != ClusterMessageType::FinalWorkerStatus &&
57 mType != ClusterMessageType::WorkerError) {
58 throw std::logic_error{std::format(
59 "Incorrect ClusterMessage constructor used for message type {}",
60 mType)};
61 }
62}

◆ ClusterMessage() [5/6]

ClusterMessage::ClusterMessage ( ClusterMessageType mType,
DataDescr && payload )

Definition at line 73 of file ClusterMessage.cxx.

74 : messageType(mType), payload(std::move(payload)) {
75 if (mType != ClusterMessageType::Data) {
76 throw std::logic_error{std::format(
77 "Incorrect ClusterMessage constructor used for message type {}",
78 mType)};
79 }
80}

◆ ClusterMessage() [6/6]

ClusterMessage::ClusterMessage ( const WireMsg & wire_msg)

Definition at line 98 of file ClusterMessage.cxx.

98 {
99 const auto& [header, body] = wire_msg;
100 messageType = static_cast<ClusterMessageType>(header[0]);
101 source = header[1];
102 if (body.has_value()) {
103 const auto& body_2 = *body;
104 if (messageType == ClusterMessageType::Data) {
105 payload = DataDescr(body_2);
106 } else {
108 status.status = StatusCode(body_2[0]);
109 status.createdEvents = body_2[1];
110 status.finishedEvents = body_2[2];
111 status.skippedEvents = body_2[3];
112 payload = status;
113 }
114 } else {
115 if (header[2] >= 0) {
116 payload = header[2];
117 }
118 }
119}
ClusterMessageType
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16
WireMsg wire_msg() const

Member Function Documentation

◆ wire_msg()

ClusterMessage::WireMsg ClusterMessage::wire_msg ( ) const
nodiscard

Definition at line 121 of file ClusterMessage.cxx.

121 {
122 constexpr int max_tag = 16383;
123 constexpr std::uint64_t lower32 = 0xFFFFFFFF;
124
125 static thread_local int next_msg =
126 1; // This is only ever called from one thread per process
128 header[0] = int(messageType);
129 header[1] = source;
130 if (payload.index() == 3) {
131 next_msg = (next_msg % max_tag) + 1;
132 header[2] = next_msg;
133 WireMsgBody body{};
134 const auto& payload_local = std::get<DataDescr>(payload);
135 body[0] = int(std::uint64_t(payload_local.ptr) >> 32);
136 body[1] = int(std::uint64_t(payload_local.ptr) & lower32);
137 body[2] = int(std::uint64_t(payload_local.len) >> 32);
138 body[3] = int(std::uint64_t(payload_local.len) & lower32);
139 body[4] = int(std::uint64_t(payload_local.align) >> 32);
140 body[5] = int(std::uint64_t(payload_local.align) & lower32);
141 body[6] = int(std::uint64_t(payload_local.evtNumber) >> 32);
142 body[7] = int(std::uint64_t(payload_local.evtNumber) & lower32);
143 body[8] = int(std::uint64_t(payload_local.fileNumber) >> 32);
144 body[9] = int(std::uint64_t(payload_local.fileNumber) & lower32);
145 WireMsg msg{header, std::make_optional(body)};
146 return msg;
147 }
148 if (payload.index() == 2) {
149 next_msg = (next_msg % max_tag) + 1;
150 header[2] = next_msg;
151 WireMsgBody body{};
152 const auto& payload_local = std::get<WorkerStatus>(payload);
153 body[0] = static_cast<int>(payload_local.status.getCode());
154 body[1] = payload_local.createdEvents;
155 body[2] = payload_local.finishedEvents;
156 body[3] = payload_local.skippedEvents;
157 body[4] = body[5] = body[6] = body[7] = body[8] = body[9] = 0;
158 WireMsg msg{header, std::make_optional(body)};
159 return msg;
160 }
161 // else
162 if (payload.index() == 1) { // if we have an int payload
163 header[2] = std::get<int>(payload);
164 } else {
165 header[2] = -1;
166 }
167 WireMsg msg{header, std::nullopt};
168 return msg;
169}
std::array< int, 10 > WireMsgBody
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
std::array< int, 3 > WireMsgHdr
MsgStream & msg
Definition testRead.cxx:32

Member Data Documentation

◆ messageType

ClusterMessageType ClusterMessage::messageType {ClusterMessageType::EMPTY}

Definition at line 41 of file ClusterMessage.h.

41{ClusterMessageType::EMPTY};

◆ payload

Payload_t ClusterMessage::payload {}

Definition at line 81 of file ClusterMessage.h.

81{};

◆ source

int ClusterMessage::source = -1

Definition at line 40 of file ClusterMessage.h.


The documentation for this class was generated from the following files: