127 {
128
129 std::string outputConnection = outputConnectionSpec.substr(0, outputConnectionSpec.find('['));
133 if (!this->
cleanUp(outputConnection).isSuccess()) {
135 return(StatusCode::FAILURE);
136 }
137 return(StatusCode::SUCCESS);
138 }
140 ATH_MSG_DEBUG(
"commitOutput SKIPPED for uninitialized server.");
141 return(StatusCode::SUCCESS);
142 }
143 std::map<void*, RootType> commitCache;
146
147 const char* placementStr = nullptr;
150 if (
sc.isSuccess() && placementStr !=
nullptr && strlen(placementStr) > 6 && num > 0) {
151 const char * matchedChars = strstr(placementStr, "[FILE=");
152 if (!matchedChars){
153 ATH_MSG_ERROR(std::format(
"No matching filename in {}", placementStr));
155 }
159 ATH_MSG_ERROR(std::format(
"Failed to connectOutput for {}", fileName));
161 }
163 bool dataHeaderSeen = false;
164 std::string dataHeaderID;
165 while (num > 0) {
166 std::string objName = "ALL";
167 if (useDetailChronoStat()) {
168 objName = placementStr;
169 }
170
171 {
172 PMonUtils::BasicStopWatch stopWatch("cRep_" + objName, this->m_chronoMap);
173 std::string_view pStr = placementStr;
174 std::string::size_type cpos = pStr.find ("[CONT=");
175 if (cpos == std::string::npos) {
176 ATH_MSG_ERROR(std::format(
"No CONT field in placement string: {}", pStr));
177 return StatusCode::FAILURE;
178 }
179 std::string tokenStr (pStr.substr(0, cpos));
180 std::string contName (pStr.substr(cpos, std::string::npos));
181 std::string::size_type cl1 = contName.find(']');
182 if (cl1 == std::string::npos) {
183 ATH_MSG_ERROR(std::format(
"Missing close bracket after CONT field in placement string: {}", pStr));
184 return StatusCode::FAILURE;
185 }
186 tokenStr.append(contName, cl1 + 1);
187 contName = contName.substr(6, cl1 - 6);
188
189 std::string::size_type ppos = pStr.find ("[PNAME=");
190 if (ppos == std::string::npos) {
191 ATH_MSG_ERROR(std::format(
"No PNAME field in placement string: {}", pStr));
192 return StatusCode::FAILURE;
193 }
194 std::string className (pStr.substr(ppos, std::string::npos));
195 std::string::size_type cl2 = className.find(']');
196 if (cl2 == std::string::npos) {
197 ATH_MSG_ERROR(std::format(
"Missing close bracket after PNAME field in placement string: {}", pStr));
198 return StatusCode::FAILURE;
199 }
200 className = className.substr(7, cl2 - 7);
203 const std::string numStr = std::to_string(num);
205 bool foundContainer = false;
206 std::size_t opPos = contName.find('(');
208 foundContainer = true;
209 } else {
211 if (contName.compare(0, opPos, item) == 0){
212 foundContainer = true;
214 break;
215 }
216 }
217 }
218 if (len > 0 && foundContainer && contName[len] == '(' ) {
219 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
220
224 FileIncident beginInputIncident(
name(),
"BeginInputMemFile", memName);
225 incSvc->fireIncident(beginInputIncident);
226 FileIncident endInputIncident(
name(),
"EndInputMemFile", std::move(memName));
227 incSvc->fireIncident(endInputIncident);
228 }
230 }
231
232 ServiceHandle<IAthMetaDataSvc> metadataSvc(
"MetaDataSvc",
name());
234 sc = metadataSvc->shmProxy(std::format(
"{}[NUM={}]", pStr, numStr));
235 if (
sc.isRecoverable()) {
237 }
else if (
sc.isFailure()) {
240 }
241 } else {
242 Token readToken;
243 readToken.
setOid(Token::OID_t(num, 0));
247
248 if( m_oneDataHeaderForm.value() ) {
249 auto placementWithSwn = [&] { return std::format("{}[SWN={}]", placementStr, num); };
250 if( className == "DataHeaderForm_p6" ) {
251
253 "", placementWithSwn());
254 DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).ignore();
255 tokenStr = "";
256 } else {
257 Placement placement;
260 if (token == nullptr) {
263 }
264 tokenStr = token->toString();
265 }
266 if( className == "DataHeader_p6" ) {
267
269 tokenStr, placementWithSwn());
270 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
273 }
274 } else
275 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
276 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
277 }
278 placementStr = nullptr;
279 } else {
280
281 Placement placement;
282 placement.
fromString(placementStr); placementStr =
nullptr;
284 if (token == nullptr) {
287 }
288 tokenStr = token->toString();
289 if (className == "DataHeader_p6") {
290
293
294 if (!DHcnv->updateRep(&address, static_cast<DataObject*>(obj)).isSuccess()) {
297 }
298 dataHeaderSeen = true;
299
300
301
302
303
304
305 dataHeaderID = std::format("{}/{}/{}", token->contID(), numStr, token->dbID().toString());
306 } else if (dataHeaderSeen) {
307 dataHeaderSeen = false;
308
309
310 if (className == "DataHeaderForm_p6") {
311
313 tokenStr, dataHeaderID);
314 if (!DHcnv->updateRepRefs(&address, static_cast<DataObject*>(obj)).isSuccess()) {
315 ATH_MSG_ERROR(
"Failed updateRepRefs for obj = " << tokenStr);
317 }
318 } else {
319
320 GenericAddress address(0, 0, "", dataHeaderID);
321 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
324 }
325 }
326 }
327 if (className !=
"Token" && className !=
"DataHeaderForm_p6" && !classDesc.
IsFundamental()) {
328 commitCache.insert(std::pair<void*, RootType>(obj, classDesc));
329 }
330 }
331 }
332 }
333
335 while (
sc.isRecoverable()) {
337 }
338 if (!
sc.isSuccess()) {
341 }
342 }
344 while (
sc.isRecoverable()) {
346 }
347 if (
sc.isFailure()) {
348
350 }
351 }
352 if (dataHeaderSeen) {
353
354 GenericAddress address(0, 0, "", std::move(dataHeaderID));
355 if (!DHcnv->updateRepRefs(&address, nullptr).isSuccess()) {
358 }
359 }
360 placementStr = nullptr;
361 }
else if (
sc.isSuccess() && placementStr !=
nullptr && strncmp(placementStr,
"stop", 4) == 0) {
362 return(StatusCode::RECOVERABLE);
363 }
else if (
sc.isRecoverable() || num == -1) {
364 return(StatusCode::RECOVERABLE);
365 }
367 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
369 FileIncident beginInputIncident(
name(),
"BeginInputMemFile", memName);
370 incSvc->fireIncident(beginInputIncident);
371 FileIncident endInputIncident(
name(),
"EndInputMemFile", std::move(memName));
372 incSvc->fireIncident(endInputIncident);
373 if (
sc.isFailure()) {
374 ATH_MSG_INFO(
"All SharedWriter clients stopped - exiting");
375 } else {
377 }
378 return(StatusCode::FAILURE);
379 }
380 }
382 ATH_MSG_DEBUG(std::format(
"commitOutput SKIPPED for metadata-only server: {}", outputConnectionSpec));
383 return(StatusCode::SUCCESS);
384 }
385 if (outputConnection.empty()) {
386 outputConnection = std::move(fileName);
387 } else {
388 outputConnection = outputConnectionSpec;
391 }
392 }
393 std::size_t
merge = outputConnection.find(
"?pmerge=");
394 const std::string baseOutputConnection = outputConnection.substr(0, merge);
400 doCommit = true;
402 }
404 for (auto& [ptr, rootType] : commitCache) {
406 }
407 return(status);
408}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
virtual StatusCode commitOutput(const std::string &outputConnectionSpec, bool doCommit) override
Implementation of IConversionSvc: Commit pending output.
std::map< std::string, int > m_fileCommitCounter
Force SharedWriter to flush data to output file at given intervals, needed by parallel compression.
Gaudi::Property< std::string > m_streamPortString
Extension to use ROOT TMemFile for event data, "?pmerge=<host>:<port>".
virtual StatusCode cleanUp(const std::string &connection) override
Implement cleanUp to call all registered IAthenaPoolCleanUp cleanUp() function.
virtual void setObjPtr(void *&obj, const Token *token) override
bool m_streamServerActive
StatusCode abortSharedWrClients(int client_n)
Send abort to SharedWriter clients if the server quits on error.
Gaudi::Property< std::string > m_metadataContainerProp
For SharedWriter: To use MetadataSvc to merge data placed in a certain container.
virtual StatusCode connectOutput(const std::string &outputConnectionSpec, const std::string &openMode) override
Implementation of IConversionSvc: Connect to the output connection specification with open mode.
Gaudi::Property< std::vector< std::string > > m_metadataContainersAug
Gaudi::Property< bool > m_parallelCompression
Use Athena Object sharing for metadata only, event data is collected and send via ROOT TMemFile.
Gaudi::Property< std::map< std::string, int > > m_fileFlushSetting
virtual Token * registerForWrite(Placement *placement, const void *obj, const RootType &classDesc) override
const std::string & auxString() const
Access auxiliary string.
Placement & fromString(const std::string &from)
Build from the string representation of a placement.
static TScopeAdapter ByNameNoQuiet(const std::string &name, Bool_t load=kTRUE)
Bool_t IsFundamental() const
Token & setAuxString(const std::string &auxString)
Set auxiliary string.
Token & setOid(const OID_t &oid)
Set object identifier.
static const DbType POOL_StorageType
merge(input_file_pattern, output_file)
Merge many input LHE files into a single output file.
char rootType(char typeidType)
This function is used internally in the code when creating primitive dynamic auxiliary branches.