128 {
129
130 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
134 if (!this->
cleanUp(outputConnection).isSuccess()) {
136 return(StatusCode::FAILURE);
137 }
138 return(StatusCode::SUCCESS);
139 }
141 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
142 return(StatusCode::SUCCESS);
143 }
144 std::map<void*, RootType> commitCache;
147
148 const char* placementStr = nullptr;
151 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 && num > 0) {
152 const char * matchedChars = strstr(placementStr, "[FILE=");
153 if (!matchedChars){
154 ATH_MSG_ERROR(std::format(
"No matching filename in {}", placementStr));
156 }
160 ATH_MSG_ERROR(std::format(
"Failed to connectOutput for {}", fileName));
162 }
164 bool dataHeaderSeen = false;
165 std::string dataHeaderID;
166 while (num > 0) {
167 std::string objName = "ALL";
168 if (useDetailChronoStat()) {
169 objName = placementStr;
170 }
171
172 {
173 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
174 std::string_view pStr = placementStr;
175 std::string::size_type cpos = pStr.find ("[CONT=");
176 if (cpos == std::string::npos) {
177 ATH_MSG_ERROR(std::format(
"No CONT field in placement string: {}", pStr));
178 return StatusCode::FAILURE;
179 }
180 std::string tokenStr (pStr.substr(0, cpos));
181 std::string contName (pStr.substr(cpos, std::string::npos));
182 std::string::size_type cl1 = contName.find(']');
183 if (cl1 == std::string::npos) {
184 ATH_MSG_ERROR(std::format(
"Missing close bracket after CONT field in placement string: {}", pStr));
185 return StatusCode::FAILURE;
186 }
187 tokenStr.append(contName, cl1 + 1);
188 contName = contName.substr(6, cl1 - 6);
189
190 std::string::size_type ppos = pStr.find ("[PNAME=");
191 if (ppos == std::string::npos) {
192 ATH_MSG_ERROR(std::format(
"No PNAME field in placement string: {}", pStr));
193 return StatusCode::FAILURE;
194 }
195 std::string className (pStr.substr(ppos, std::string::npos));
196 std::string::size_type cl2 = className.find(']');
197 if (cl2 == std::string::npos) {
198 ATH_MSG_ERROR(std::format(
"Missing close bracket after PNAME field in placement string: {}", pStr));
199 return StatusCode::FAILURE;
200 }
201 className = className.substr(7, cl2 - 7);
204 const std::string numStr = std::to_string(num);
206 bool foundContainer = false;
207 std::size_t opPos = contName.find('(');
209 foundContainer = true;
210 } else {
212 if (contName.compare(0, opPos, item) == 0){
213 foundContainer = true;
214 len = item.size();
215 break;
216 }
217 }
218 }
219 if (len > 0 && foundContainer && contName[len] == '(' ) {
220 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
221
226 memName, {}, memName,
227 "BeginInputMemFile", "EndInputMemFile");
228 }
230 }
231
232 ServiceHandle<IAthMetaDataSvc> metadataSvc(
"MetaDataSvc",
name());
234 sc = metadataSvc->shmProxy(std::format(
"{}[NUM={}]", pStr, numStr));
235 if (
sc.isRecoverable()) {
237 }
else if (
sc.isFailure()) {
240 }
241 } else {
242 Token readToken;
243 readToken.
setOid(Token::OID_t(num, 0));
247
248 if( m_oneDataHeaderForm.value() ) {
249 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
250 if( className == "DataHeaderForm_p6" ) {
251
253 "", placementWithSwn());
254 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
255 tokenStr = "";
256 } else {
257 Placement placement;
260 if (token == nullptr) {
263 }
264 tokenStr = token->toString();
265 }
266 if( className == "DataHeader_p6" ) {
267
269 tokenStr, placementWithSwn());
270 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
273 }
274 } else
275 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
276 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
277 }
278 placementStr = nullptr;
279 } else {
280
281 Placement placement;
282 placement.
fromString(placementStr); placementStr =
nullptr;
284 if (token == nullptr) {
287 }
288 tokenStr = token->toString();
289 if (className == "DataHeader_p6") {
290
293
294 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
297 }
298 dataHeaderSeen = true;
299
300
301
302
303
304
305 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
306 } else if (dataHeaderSeen) {
307 dataHeaderSeen = false;
308
309
310 if (className == "DataHeaderForm_p6") {
311
313 tokenStr, dataHeaderID);
314 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
315 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
317 }
318 } else {
319
320 GenericAddress address(0, 0, "", dataHeaderID);
321 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
324 }
325 }
326 }
327 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
328 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
329 }
330 }
331 }
332 }
333
335 while (
sc.isRecoverable()) {
337 }
338 if (!
sc.isSuccess()) {
341 }
342 }
344 while (
sc.isRecoverable()) {
346 }
347 if (
sc.isFailure()) {
348
350 }
351 }
352 if (dataHeaderSeen) {
353
354 GenericAddress address(0, 0, "", std::move(dataHeaderID));
355 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
358 }
359 }
360 placementStr = nullptr;
361 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
362 return(StatusCode::RECOVERABLE);
363 }
else if (
sc.isRecoverable() || num == -1) {
364 return(StatusCode::RECOVERABLE);
365 }
367 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
369 {
371 memName, {}, memName,
372 "BeginInputMemFile", "EndInputMemFile");
373 }
374 if (
sc.isFailure()) {
375 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
376 } else {
378 }
379 return(StatusCode::FAILURE);
380 }
381 }
383 ATH_MSG_DEBUG(std::format(
"commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
384 return(StatusCode::SUCCESS);
385 }
386 if (outputConnection.empty()) {
387 outputConnection = std::move(fileName);
388 } else {
389 outputConnection = outputConnectionSpec;
392 }
393 }
394 std::size_t
merge = outputConnection.find(
"?pmerge=");
395 const std::string baseOutputConnection = outputConnection.substr(0, merge);
401 doCommit = true;
403 }
405 for (auto& [ptr, rootType] : commitCache) {
407 }
408 return(status);
409}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
bool merge(const StringPool &other)
Merge another pool into this one.
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
bool m_streamServerActive
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & auxString() const
Access auxiliary string.
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Bool_t IsFundamental() const
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Token & setOid(const OID_t &oid)
Set object identifier.
static const DbType POOL_StorageType
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.
static constexpr CLID ID()