ATLAS Offline Software
Loading...
Searching...
No Matches
ClusterMessage Class Reference

A class describing a message sent between nodes in a cluster. More...

#include <ClusterMessage.h>

Collaboration diagram for ClusterMessage:

Classes

struct  DataDescr
struct  WorkerStatus

Public Types

using WireMsgHdr = std::array<int, 3>
using WireMsgBody = std::array<int, 10>
using WireMsg = std::tuple<WireMsgHdr, std::optional<WireMsgBody>>
using Payload_t = std::variant<std::monostate, int, WorkerStatus, DataDescr>

Public Member Functions

 ClusterMessage ()
 ClusterMessage (ClusterMessageType mType)
 ClusterMessage (ClusterMessageType mType, int payload)
 ClusterMessage (ClusterMessageType mType, WorkerStatus payload)
 ClusterMessage (ClusterMessageType mType, DataDescr &&payload)
 ClusterMessage (const WireMsg &)
WireMsg wire_msg () const

Public Attributes

int source = -1
ClusterMessageType messageType {ClusterMessageType::EMPTY}
Payload_t payload {}

Detailed Description

A class describing a message sent between nodes in a cluster.

Definition at line 30 of file ClusterMessage.h.

Member Typedef Documentation

◆ Payload_t

using ClusterMessage::Payload_t = std::variant<std::monostate, int, WorkerStatus, DataDescr>

Definition at line 77 of file ClusterMessage.h.

◆ WireMsg

using ClusterMessage::WireMsg = std::tuple<WireMsgHdr, std::optional<WireMsgBody>>

Definition at line 38 of file ClusterMessage.h.

◆ WireMsgBody

using ClusterMessage::WireMsgBody = std::array<int, 10>

Definition at line 37 of file ClusterMessage.h.

◆ WireMsgHdr

using ClusterMessage::WireMsgHdr = std::array<int, 3>

Definition at line 36 of file ClusterMessage.h.

Constructor & Destructor Documentation

◆ ClusterMessage() [1/6]

ClusterMessage::ClusterMessage ( )
default

◆ ClusterMessage() [2/6]

ClusterMessage::ClusterMessage ( ClusterMessageType mType)

Definition at line 79 of file ClusterMessage.cxx.

79 : messageType(mType) {
80 switch (mType) {
81 case ClusterMessageType::RequestEvent:
82 case ClusterMessageType::EventsDone:
83 case ClusterMessageType::EmergencyStop:
84 // OK
85 break;
86 default:
87 throw std::logic_error{std::format(
88 "Incorrect ClusterMessage constructor used for message type {}",
89 mType)};
90 }
91}
ClusterMessageType messageType

◆ ClusterMessage() [3/6]

ClusterMessage::ClusterMessage ( ClusterMessageType mType,
int payload )

Definition at line 61 of file ClusterMessage.cxx.

62 : messageType(mType), payload(payload) {
63 if (mType != ClusterMessageType::ProvideEvent) {
64 throw std::logic_error{std::format(
65 "Incorrect ClusterMessage constructor used for message type {}",
66 mType)};
67 }
68}
Payload_t payload

◆ ClusterMessage() [4/6]

ClusterMessage::ClusterMessage ( ClusterMessageType mType,
WorkerStatus payload )

Definition at line 51 of file ClusterMessage.cxx.

52 : messageType(mType), payload(payload) {
53 if (mType != ClusterMessageType::FinalWorkerStatus &&
54 mType != ClusterMessageType::WorkerError) {
55 throw std::logic_error{std::format(
56 "Incorrect ClusterMessage constructor used for message type {}",
57 mType)};
58 }
59}

◆ ClusterMessage() [5/6]

ClusterMessage::ClusterMessage ( ClusterMessageType mType,
DataDescr && payload )

Definition at line 70 of file ClusterMessage.cxx.

71 : messageType(mType), payload(std::move(payload)) {
72 if (mType != ClusterMessageType::Data) {
73 throw std::logic_error{std::format(
74 "Incorrect ClusterMessage constructor used for message type {}",
75 mType)};
76 }
77}

◆ ClusterMessage() [6/6]

ClusterMessage::ClusterMessage ( const WireMsg & wire_msg)

Definition at line 95 of file ClusterMessage.cxx.

95 {
96 const auto& [header, body] = wire_msg;
97 messageType = static_cast<ClusterMessageType>(header[0]);
98 source = header[1];
99 if (body.has_value()) {
100 const auto& body_2 = *body;
101 if (messageType == ClusterMessageType::Data) {
102 payload = DataDescr(body_2);
103 } else {
105 status.status = StatusCode(body_2[0]);
106 status.createdEvents = body_2[1];
107 status.finishedEvents = body_2[2];
108 status.skippedEvents = body_2[3];
109 payload = status;
110 }
111 } else {
112 if (header[2] >= 0) {
113 payload = header[2];
114 }
115 }
116}
ClusterMessageType
::StatusCode StatusCode
StatusCode definition for legacy code.
status
Definition merge.py:16
WireMsg wire_msg() const

Member Function Documentation

◆ wire_msg()

ClusterMessage::WireMsg ClusterMessage::wire_msg ( ) const
nodiscard

Definition at line 118 of file ClusterMessage.cxx.

118 {
119 constexpr int max_tag = 16383;
120 constexpr std::uint64_t lower32 = 0xFFFFFFFF;
121
122 static thread_local int next_msg =
123 1; // This is only ever called from one thread per process
125 header[0] = int(messageType);
126 header[1] = source;
127 if (payload.index() == 3) {
128 next_msg = (next_msg % max_tag) + 1;
129 header[2] = next_msg;
130 WireMsgBody body{};
131 const auto& payload_local = std::get<DataDescr>(payload);
132 body[0] = int(std::uint64_t(payload_local.ptr) >> 32);
133 body[1] = int(std::uint64_t(payload_local.ptr) & lower32);
134 body[2] = int(std::uint64_t(payload_local.len) >> 32);
135 body[3] = int(std::uint64_t(payload_local.len) & lower32);
136 body[4] = int(std::uint64_t(payload_local.align) >> 32);
137 body[5] = int(std::uint64_t(payload_local.align) & lower32);
138 body[6] = int(std::uint64_t(payload_local.evtNumber) >> 32);
139 body[7] = int(std::uint64_t(payload_local.evtNumber) & lower32);
140 body[8] = int(std::uint64_t(payload_local.fileNumber) >> 32);
141 body[9] = int(std::uint64_t(payload_local.fileNumber) & lower32);
142 WireMsg msg{header, std::make_optional(body)};
143 return msg;
144 }
145 if (payload.index() == 2) {
146 next_msg = (next_msg % max_tag) + 1;
147 header[2] = next_msg;
148 WireMsgBody body{};
149 const auto& payload_local = std::get<WorkerStatus>(payload);
150 body[0] = static_cast<int>(payload_local.status.getCode());
151 body[1] = payload_local.createdEvents;
152 body[2] = payload_local.finishedEvents;
153 body[3] = payload_local.skippedEvents;
154 body[4] = body[5] = body[6] = body[7] = body[8] = body[9] = 0;
155 WireMsg msg{header, std::make_optional(body)};
156 return msg;
157 }
158 // else
159 if (payload.index() == 1) { // if we have an int payload
160 header[2] = std::get<int>(payload);
161 } else {
162 header[2] = -1;
163 }
164 WireMsg msg{header, std::nullopt};
165 return msg;
166}
std::array< int, 10 > WireMsgBody
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
std::array< int, 3 > WireMsgHdr
MsgStream & msg
Definition testRead.cxx:32

Member Data Documentation

◆ messageType

ClusterMessageType ClusterMessage::messageType {ClusterMessageType::EMPTY}

Definition at line 41 of file ClusterMessage.h.

41{ClusterMessageType::EMPTY};

◆ payload

Payload_t ClusterMessage::payload {}

Definition at line 78 of file ClusterMessage.h.

78{};

◆ source

int ClusterMessage::source = -1

Definition at line 40 of file ClusterMessage.h.


The documentation for this class was generated from the following files: