131 {
132
133 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
137 if (!this->
cleanUp(outputConnection).isSuccess()) {
139 return(StatusCode::FAILURE);
140 }
141 return(StatusCode::SUCCESS);
142 }
144 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
145 return(StatusCode::SUCCESS);
146 }
147 std::map<void*, RootType> commitCache;
150
151 const char* placementStr = nullptr;
154 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 && num > 0) {
155 const char * matchedChars = strstr(placementStr, "[FILE=");
156 if (!matchedChars){
157 ATH_MSG_ERROR(std::format(
"No matching filename in {}", placementStr));
159 }
163 ATH_MSG_ERROR(std::format(
"Failed to connectOutput for {}", fileName));
165 }
167 bool dataHeaderSeen = false;
168 std::string dataHeaderID;
169 while (num > 0) {
170 std::string objName = "ALL";
171 if (useDetailChronoStat()) {
172 objName = placementStr;
173 }
174
175 {
176 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
177 std::string_view pStr = placementStr;
178 std::string::size_type cpos = pStr.find ("[CONT=");
179 if (cpos == std::string::npos) {
180 ATH_MSG_ERROR(std::format(
"No CONT field in placement string: {}", pStr));
181 return StatusCode::FAILURE;
182 }
183 std::string tokenStr (pStr.substr(0, cpos));
184 std::string contName (pStr.substr(cpos, std::string::npos));
185 std::string::size_type cl1 = contName.find(']');
186 if (cl1 == std::string::npos) {
187 ATH_MSG_ERROR(std::format(
"Missing close bracket after CONT field in placement string: {}", pStr));
188 return StatusCode::FAILURE;
189 }
190 tokenStr.append(contName, cl1 + 1);
191 contName = contName.substr(6, cl1 - 6);
192
193 std::string::size_type ppos = pStr.find ("[PNAME=");
194 if (ppos == std::string::npos) {
195 ATH_MSG_ERROR(std::format(
"No PNAME field in placement string: {}", pStr));
196 return StatusCode::FAILURE;
197 }
198 std::string className (pStr.substr(ppos, std::string::npos));
199 std::string::size_type cl2 = className.find(']');
200 if (cl2 == std::string::npos) {
201 ATH_MSG_ERROR(std::format(
"Missing close bracket after PNAME field in placement string: {}", pStr));
202 return StatusCode::FAILURE;
203 }
204 className = className.substr(7, cl2 - 7);
207 const std::string numStr = std::to_string(num);
209 bool foundContainer = false;
210 std::size_t opPos = contName.find('(');
212 foundContainer = true;
213 } else {
215 if (contName.compare(0, opPos, item) == 0){
216 foundContainer = true;
217 len = item.size();
218 break;
219 }
220 }
221 }
222 if (len > 0 && foundContainer && contName[len] == '(' ) {
223 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
224
229 memName, {}, memName,
230 "BeginInputMemFile", "EndInputMemFile");
231 }
233 }
234
235 ServiceHandle<IAthMetaDataSvc> metadataSvc(
"MetaDataSvc",
name());
237 sc = metadataSvc->shmProxy(std::format(
"{}[NUM={}]", pStr, numStr));
238 if (
sc.isRecoverable()) {
240 }
else if (
sc.isFailure()) {
243 }
244 } else {
245 Token readToken;
246 readToken.
setOid(Token::OID_t(num, 0));
250
251 if( m_oneDataHeaderForm.value() ) {
252 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
253 if( className == "DataHeaderForm_p6" ) {
254
256 "", placementWithSwn());
257 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
258 tokenStr = "";
259 } else {
260 Placement placement;
263 if (token == nullptr) {
266 }
267 tokenStr = token->toString();
268 }
269 if( className == "DataHeader_p6" ) {
270
272 tokenStr, placementWithSwn());
273 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
276 }
277 } else
278 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
279 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
280 }
281 placementStr = nullptr;
282 } else {
283
284 Placement placement;
285 placement.
fromString(placementStr); placementStr =
nullptr;
287 if (token == nullptr) {
290 }
291 tokenStr = token->toString();
292 if (className == "DataHeader_p6") {
293
296
297 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
300 }
301 dataHeaderSeen = true;
302
303
304
305
306
307
308 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
309 } else if (dataHeaderSeen) {
310 dataHeaderSeen = false;
311
312
313 if (className == "DataHeaderForm_p6") {
314
316 tokenStr, dataHeaderID);
317 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
318 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
320 }
321 } else {
322
323 GenericAddress address(0, 0, "", dataHeaderID);
324 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
327 }
328 }
329 }
330 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
331 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
332 }
333 }
334 }
335 }
336
338 while (
sc.isRecoverable()) {
340 }
341 if (!
sc.isSuccess()) {
344 }
345 }
347 while (
sc.isRecoverable()) {
349 }
350 if (
sc.isFailure()) {
351
353 }
354 }
355 if (dataHeaderSeen) {
356
357 GenericAddress address(0, 0, "", std::move(dataHeaderID));
358 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
361 }
362 }
363 placementStr = nullptr;
364 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
365 return(StatusCode::RECOVERABLE);
366 }
else if (
sc.isRecoverable() || num == -1) {
367 return(StatusCode::RECOVERABLE);
368 }
370 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
372 {
374 memName, {}, memName,
375 "BeginInputMemFile", "EndInputMemFile");
376 }
377 if (
sc.isFailure()) {
378 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
379 } else {
381 }
382 return(StatusCode::FAILURE);
383 }
384 }
386 ATH_MSG_DEBUG(std::format(
"commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
387 return(StatusCode::SUCCESS);
388 }
389 if (outputConnection.empty()) {
390 outputConnection = std::move(fileName);
391 } else {
392 outputConnection = outputConnectionSpec;
395 }
396 }
397 std::size_t
merge = outputConnection.find(
"?pmerge=");
398 const std::string baseOutputConnection = outputConnection.substr(0, merge);
404 doCommit = true;
406 }
408 for (auto& [ptr, rootType] : commitCache) {
410 }
411 return(status);
412}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
bool merge(const StringPool &other)
Merge another pool into this one.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
bool m_streamServerActive
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & auxString() const
Access auxiliary string.
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Bool_t IsFundamental() const
Token & setOid(const OID_t &oid)
Set object identifier.
Token & setAuxString(std::string &&auxString)
Set auxiliary string.
static const DbType POOL_StorageType
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.
static constexpr CLID ID()