128 {
129
130 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
134 if (!this->cleanUp(outputConnection).isSuccess()) {
136 return(StatusCode::FAILURE);
137 }
138 return(StatusCode::SUCCESS);
139 }
141 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
142 return(StatusCode::SUCCESS);
143 }
144 std::map<void*, RootType> commitCache;
147
148 const char* placementStr = nullptr;
151 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 && num > 0) {
152 const char * matchedChars = strstr(placementStr, "[FILE=");
153 if (!matchedChars){
154 ATH_MSG_ERROR(std::format(
"No matching filename in {}", placementStr));
156 }
160 ATH_MSG_ERROR(std::format(
"Failed to connectOutput for {}", fileName));
162 }
164 bool dataHeaderSeen = false;
165 std::string dataHeaderID;
166 while (num > 0) {
167 std::string objName = "ALL";
168 if (useDetailChronoStat()) {
169 objName = placementStr;
170 }
171
172 {
173 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
174 std::string_view pStr = placementStr;
175 std::string::size_type cpos = pStr.find ("[CONT=");
176 if (cpos == std::string::npos) {
177 ATH_MSG_ERROR(std::format(
"No CONT field in placement string: {}", pStr));
178 return StatusCode::FAILURE;
179 }
180 std::string tokenStr (pStr.substr(0, cpos));
181 std::string contName (pStr.substr(cpos, std::string::npos));
182 std::string::size_type cl1 = contName.find(']');
183 if (cl1 == std::string::npos) {
184 ATH_MSG_ERROR(std::format(
"Missing close bracket after CONT field in placement string: {}", pStr));
185 return StatusCode::FAILURE;
186 }
187 tokenStr.append(contName, cl1 + 1);
188 contName = contName.substr(6, cl1 - 6);
189
190 std::string::size_type ppos = pStr.find ("[PNAME=");
191 if (ppos == std::string::npos) {
192 ATH_MSG_ERROR(std::format(
"No PNAME field in placement string: {}", pStr));
193 return StatusCode::FAILURE;
194 }
195 std::string className (pStr.substr(ppos, std::string::npos));
196 std::string::size_type cl2 = className.find(']');
197 if (cl2 == std::string::npos) {
198 ATH_MSG_ERROR(std::format(
"Missing close bracket after PNAME field in placement string: {}", pStr));
199 return StatusCode::FAILURE;
200 }
201 className = className.substr(7, cl2 - 7);
204 const std::string numStr = std::to_string(num);
206 bool foundContainer = false;
207 std::size_t opPos = contName.find('(');
209 foundContainer = true;
210 } else {
212 if (contName.compare(0, opPos, item) == 0){
213 foundContainer = true;
215 break;
216 }
217 }
218 }
219 if (len > 0 && foundContainer && contName[len] == '(' ) {
220 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
221
225 FileIncident beginInputIncident(
name(),
"BeginInputMemFile", memName);
226 incSvc->fireIncident(beginInputIncident);
227 FileIncident endInputIncident(
name(),
"EndInputMemFile", std::move(memName));
228 incSvc->fireIncident(endInputIncident);
229 }
231 }
232
233 ServiceHandle<IAthMetaDataSvc> metadataSvc(
"MetaDataSvc",
name());
235 sc = metadataSvc->shmProxy(std::format(
"{}[NUM={}]", pStr, numStr));
236 if (
sc.isRecoverable()) {
238 }
else if (
sc.isFailure()) {
241 }
242 } else {
243 Token readToken;
244 readToken.
setOid(Token::OID_t(num, 0));
248
249 if( m_oneDataHeaderForm.value() ) {
250 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
251 if( className == "DataHeaderForm_p6" ) {
252
254 "", placementWithSwn());
255 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
256 tokenStr = "";
257 } else {
258 Placement placement;
261 if (token == nullptr) {
264 }
265 tokenStr = token->toString();
266 }
267 if( className == "DataHeader_p6" ) {
268
270 tokenStr, placementWithSwn());
271 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
274 }
275 } else
276 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
277 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
278 }
279 placementStr = nullptr;
280 } else {
281
282 Placement placement;
283 placement.
fromString(placementStr); placementStr =
nullptr;
285 if (token == nullptr) {
288 }
289 tokenStr = token->toString();
290 if (className == "DataHeader_p6") {
291
294
295 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
298 }
299 dataHeaderSeen = true;
300
301
302
303
304
305
306 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
307 } else if (dataHeaderSeen) {
308 dataHeaderSeen = false;
309
310
311 if (className == "DataHeaderForm_p6") {
312
314 tokenStr, dataHeaderID);
315 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
316 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
318 }
319 } else {
320
321 GenericAddress address(0, 0, "", dataHeaderID);
322 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
325 }
326 }
327 }
328 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
329 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
330 }
331 }
332 }
333 }
334
336 while (
sc.isRecoverable()) {
338 }
339 if (!
sc.isSuccess()) {
342 }
343 }
345 while (
sc.isRecoverable()) {
347 }
348 if (
sc.isFailure()) {
349
351 }
352 }
353 if (dataHeaderSeen) {
354
355 GenericAddress address(0, 0, "", std::move(dataHeaderID));
356 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
359 }
360 }
361 placementStr = nullptr;
362 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
363 return(StatusCode::RECOVERABLE);
364 }
else if (
sc.isRecoverable() || num == -1) {
365 return(StatusCode::RECOVERABLE);
366 }
368 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
370 FileIncident beginInputIncident(
name(),
"BeginInputMemFile", memName);
371 incSvc->fireIncident(beginInputIncident);
372 FileIncident endInputIncident(
name(),
"EndInputMemFile", std::move(memName));
373 incSvc->fireIncident(endInputIncident);
374 if (
sc.isFailure()) {
375 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
376 } else {
378 }
379 return(StatusCode::FAILURE);
380 }
381 }
383 ATH_MSG_DEBUG(std::format(
"commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
384 return(StatusCode::SUCCESS);
385 }
386 if (outputConnection.empty()) {
387 outputConnection = std::move(fileName);
388 } else {
389 outputConnection = outputConnectionSpec;
392 }
393 }
395 for (auto& [ptr, rootType] : commitCache) {
397 }
398 return(status);
399}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual void setObjPtr(void *&obj, const Token *token) override
bool m_streamServerActive
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & auxString() const
Access auxiliary string.
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Bool_t IsFundamental() const
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Token & setOid(const OID_t &oid)
Set object identifier.
static const DbType POOL_StorageType
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.