ATLAS Offline Software
Classes | Public Types | Public Member Functions | Public Attributes | List of all members
ClusterMessage Class Reference

A class describing a message sent between nodes in a cluster. More...

#include <ClusterMessage.h>

Collaboration diagram for ClusterMessage:

Classes

struct  DataDescr
 
struct  WorkerStatus
 

Public Types

using WireMsgHdr = std::array< int, 3 >
 
using WireMsgBody = std::array< int, 10 >
 
using WireMsg = std::tuple< WireMsgHdr, std::optional< WireMsgBody > >
 
using Payload_t = std::variant< std::monostate, int, WorkerStatus, DataDescr >
 

Public Member Functions

 ClusterMessage ()
 
 ClusterMessage (ClusterMessageType mType)
 
 ClusterMessage (ClusterMessageType mType, int payload)
 
 ClusterMessage (ClusterMessageType mType, WorkerStatus payload)
 
 ClusterMessage (ClusterMessageType mType, DataDescr &&payload)
 
 ClusterMessage (const WireMsg &)
 
WireMsg wire_msg () const
 

Public Attributes

int source = -1
 
ClusterMessageType messageType {ClusterMessageType::EMPTY}
 
Payload_t payload {}
 

Detailed Description

A class describing a message sent between nodes in a cluster.

Definition at line 30 of file ClusterMessage.h.

Member Typedef Documentation

◆ Payload_t

using ClusterMessage::Payload_t = std::variant<std::monostate, int, WorkerStatus, DataDescr>

Definition at line 77 of file ClusterMessage.h.

◆ WireMsg

using ClusterMessage::WireMsg = std::tuple<WireMsgHdr, std::optional<WireMsgBody> >

Definition at line 38 of file ClusterMessage.h.

◆ WireMsgBody

using ClusterMessage::WireMsgBody = std::array<int, 10>

Definition at line 37 of file ClusterMessage.h.

◆ WireMsgHdr

using ClusterMessage::WireMsgHdr = std::array<int, 3>

Definition at line 36 of file ClusterMessage.h.

Constructor & Destructor Documentation

◆ ClusterMessage() [1/6]

ClusterMessage::ClusterMessage ( )
default

◆ ClusterMessage() [2/6]

ClusterMessage::ClusterMessage ( ClusterMessageType  mType)

Definition at line 79 of file ClusterMessage.cxx.

79  : messageType(mType) {
80  switch (mType) {
84  // OK
85  break;
86  default:
87  throw std::logic_error{std::format(
88  "Incorrect ClusterMessage constructor used for message type {}",
89  mType)};
90  }
91 }

◆ ClusterMessage() [3/6]

ClusterMessage::ClusterMessage ( ClusterMessageType  mType,
int  payload 
)

Definition at line 61 of file ClusterMessage.cxx.

62  : messageType(mType), payload(payload) {
63  if (mType != ClusterMessageType::ProvideEvent) {
64  throw std::logic_error{std::format(
65  "Incorrect ClusterMessage constructor used for message type {}",
66  mType)};
67  }
68 }

◆ ClusterMessage() [4/6]

ClusterMessage::ClusterMessage ( ClusterMessageType  mType,
WorkerStatus  payload 
)

Definition at line 51 of file ClusterMessage.cxx.

52  : messageType(mType), payload(payload) {
55  throw std::logic_error{std::format(
56  "Incorrect ClusterMessage constructor used for message type {}",
57  mType)};
58  }
59 }

◆ ClusterMessage() [5/6]

ClusterMessage::ClusterMessage ( ClusterMessageType  mType,
DataDescr &&  payload 
)

Definition at line 70 of file ClusterMessage.cxx.

71  : messageType(mType), payload(std::move(payload)) {
72  if (mType != ClusterMessageType::Data) {
73  throw std::logic_error{std::format(
74  "Incorrect ClusterMessage constructor used for message type {}",
75  mType)};
76  }
77 }

◆ ClusterMessage() [6/6]

ClusterMessage::ClusterMessage ( const WireMsg wire_msg)

Definition at line 95 of file ClusterMessage.cxx.

95  {
96  const auto& [header, body] = wire_msg;
97  messageType = static_cast<ClusterMessageType>(header[0]);
98  source = header[1];
99  if (body.has_value()) {
100  const auto& body_2 = *body;
102  payload = DataDescr(body_2);
103  } else {
104  WorkerStatus status{};
105  status.status = StatusCode(body_2[0]);
106  status.createdEvents = body_2[1];
107  status.finishedEvents = body_2[2];
108  status.skippedEvents = body_2[3];
109  payload = status;
110  }
111  } else {
112  if (header[2] >= 0) {
113  payload = header[2];
114  }
115  }
116 }

Member Function Documentation

◆ wire_msg()

ClusterMessage::WireMsg ClusterMessage::wire_msg ( ) const

Definition at line 118 of file ClusterMessage.cxx.

118  {
119  constexpr int max_tag = 16383;
120  constexpr std::uint64_t lower32 = 0xFFFFFFFF;
121 
122  static thread_local int next_msg =
123  1; // This is only ever called from one thread per process
124  WireMsgHdr header{};
125  header[0] = int(messageType);
126  header[1] = source;
127  if (payload.index() == 3) {
128  next_msg = (next_msg % max_tag) + 1;
129  header[2] = next_msg;
130  WireMsgBody body{};
131  const auto& payload_local = std::get<DataDescr>(payload);
132  body[0] = int(std::uint64_t(payload_local.ptr) >> 32);
133  body[1] = int(std::uint64_t(payload_local.ptr) & lower32);
134  body[2] = int(std::uint64_t(payload_local.len) >> 32);
135  body[3] = int(std::uint64_t(payload_local.len) & lower32);
136  body[4] = int(std::uint64_t(payload_local.align) >> 32);
137  body[5] = int(std::uint64_t(payload_local.align) & lower32);
138  body[6] = int(std::uint64_t(payload_local.evtNumber) >> 32);
139  body[7] = int(std::uint64_t(payload_local.evtNumber) & lower32);
140  body[8] = int(std::uint64_t(payload_local.fileNumber) >> 32);
141  body[9] = int(std::uint64_t(payload_local.fileNumber) & lower32);
142  WireMsg msg{header, std::make_optional(body)};
143  return msg;
144  }
145  if (payload.index() == 2) {
146  next_msg = (next_msg % max_tag) + 1;
147  header[2] = next_msg;
148  WireMsgBody body{};
149  const auto& payload_local = std::get<WorkerStatus>(payload);
150  body[0] = static_cast<int>(payload_local.status.getCode());
151  body[1] = payload_local.createdEvents;
152  body[2] = payload_local.finishedEvents;
153  body[3] = payload_local.skippedEvents;
154  body[4] = body[5] = body[6] = body[7] = body[8] = body[9] = 0;
155  WireMsg msg{header, std::make_optional(body)};
156  return msg;
157  }
158  // else
159  if (payload.index() == 1) { // if we have an int payload
160  header[2] = std::get<int>(payload);
161  } else {
162  header[2] = -1;
163  }
164  WireMsg msg{header, std::nullopt};
165  return msg;
166 }

Member Data Documentation

◆ messageType

ClusterMessageType ClusterMessage::messageType {ClusterMessageType::EMPTY}

Definition at line 41 of file ClusterMessage.h.

◆ payload

Payload_t ClusterMessage::payload {}

Definition at line 78 of file ClusterMessage.h.

◆ source

int ClusterMessage::source = -1

Definition at line 40 of file ClusterMessage.h.


The documentation for this class was generated from the following files:
ClusterMessage::payload
Payload_t payload
Definition: ClusterMessage.h:78
ClusterMessageType::Data
@ Data
header
Definition: hcg.cxx:526
vtune_athena.format
format
Definition: vtune_athena.py:14
ClusterMessageType::RequestEvent
@ RequestEvent
ClusterMessageType
An enum class defining what type of message this is.
ClusterMessageType::EmergencyStop
@ EmergencyStop
ClusterMessage::WireMsgBody
std::array< int, 10 > WireMsgBody
Definition: ClusterMessage.h:37
EL::StatusCode
::StatusCode StatusCode
StatusCode definition for legacy code.
Definition: PhysicsAnalysis/D3PDTools/EventLoop/EventLoop/StatusCode.h:22
xAOD::uint64_t
uint64_t
Definition: EventInfo_v1.cxx:123
ClusterMessageType::FinalWorkerStatus
@ FinalWorkerStatus
ClusterMessage::wire_msg
WireMsg wire_msg() const
Definition: ClusterMessage.cxx:118
ClusterMessage::messageType
ClusterMessageType messageType
Definition: ClusterMessage.h:41
ClusterMessage::source
int source
Definition: ClusterMessage.h:40
ClusterMessageType::WorkerError
@ WorkerError
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
ClusterMessage::WireMsgHdr
std::array< int, 3 > WireMsgHdr
Definition: ClusterMessage.h:36
ClusterMessageType::EventsDone
@ EventsDone
ClusterMessage::WireMsg
std::tuple< WireMsgHdr, std::optional< WireMsgBody > > WireMsg
Definition: ClusterMessage.h:38
merge.status
status
Definition: merge.py:16
makeTOC.header
header
Definition: makeTOC.py:28
ClusterMessageType::ProvideEvent
@ ProvideEvent
python.AutoConfigFlags.msg
msg
Definition: AutoConfigFlags.py:7