8 from __future__
import print_function
12 """This class manages common methods of AugmentedPoolStream and AugmentedByteStream."""
26 if not isinstance(aList, list):
27 raise TypeError(
"AugmentedPoolStream._ItemInList() does not support aList of type %s"%
type(aList))
37 if isinstance(item,list):
41 elif isinstance(item, str):
45 raise TypeError(
"AugmentedPoolStream._AddValidItemToList() does not support item of type %s"%
type(item))
50 if isinstance(item,list):
54 elif isinstance(item, str):
58 print(self.
Name,
": WARNING you asked to remove item '%s' but this item is not present",item)
60 raise TypeError(
"AugmentedPoolStream._RemoveValidItemFromList() does not support item of type %s"%
type(item))
74 return self.Stream.AcceptAlgs
87 return self.Stream.RequireAlgs
100 return self.Stream.VetoAlgs
118 from EventBookkeeperTools.BookkeepingInfoWriter
import SkimDecisionsWriter
119 sdw=SkimDecisionsWriter(self.
Name+
"_SkimDecisionsWriter")
120 newContainerName=self.
Name+
"_"+sdw.SkimDecisionsContainerName
121 sdw.SkimDecisionsContainerName=newContainerName
137 if not hasattr(self,
"_Prescaler"):
138 from PrimaryDPDMaker.PrimaryDPDMakerConf
import PrimaryDPDPrescaler
139 prescalerName=
"MasterPrescale_"+self.
Name
145 self.
_Prescaler.RequireAlgs = self.Stream.RequireAlgs[:]
146 self.
_Prescaler.AcceptAlgs = self.Stream.AcceptAlgs[:]
148 from AthenaCommon.AlgSequence
import AlgSequence
157 if hasattr(self,
"_Prescaler"):
166 """This class manages the associated event-by-event and metadata AthenaOutputStreams as a single object."""
167 def __init__(self, StreamName, FileName, asAlg, isVirtual, noTag=False):
168 AugmentedStreamBase.__init__(self,StreamName)
170 from AthenaPoolCnvSvc.WriteAthenaPool
import AthenaPoolOutputStream
172 if isVirtual
is True:
173 self.
Stream.WriteOnExecute=
False
174 self.
Stream.WriteOnFinalize=
False
188 self.
Stream.OutputFile = name
202 return self.
Stream.ItemList
213 return self.
Stream.MetadataItemList
219 self.
Stream.AcceptAlgs=AlgsList
226 self.
Stream.RequireAlgs=AlgsList
233 self.
Stream.VetoAlgs=AlgsList
238 print(
"**** AugmentedPoolStream",self.
Name,
"****")
239 print(
"Output file:")
243 print(
"RequireAlgs:")
247 print(
"OtherAlgs to bookkeep (but not directly used by the Stream):")
249 print(
"Master prescale:")
253 print(
"MetaData ItemList:")
260 def __init__(self, StreamName, FileName, Parent, asAlg, isVirtual, noTag=True):
261 AugmentedPoolStream.__init__(self, StreamName, FileName, asAlg, isVirtual, noTag)
265 index=MSMgr.StreamDict[Parent]
276 if item
in MSMgr.StreamList[self.
parentIndex].Stream.ItemList:
287 class AugmentedByteStream( AugmentedStreamBase ):
289 AugmentedStreamBase.__init__(self,StreamName)
292 from AthenaCommon.AppMgr
import theApp
293 svcMgr = theApp.serviceMgr()
296 from ByteStreamCnvSvc.ByteStreamCnvSvcConf
import ByteStreamEventStorageOutputSvc,ByteStreamOutputStreamCopyTool
306 OutputDirectory=outDir,
307 SimpleFileName=FileName )
312 self.
bsCopyTool.ByteStreamInputSvc=svcMgr.ByteStreamInputSvc
315 from AthenaServices.AthenaServicesConf
import AthenaOutputStream
319 from AthenaCommon.AlgSequence
import AthSequencer
321 outSequence += self.
Stream
343 print(
"**** AugmentedByteStream",self.
Name,
"****")
344 print(
"Output file:")
348 print(
"RequireAlgs:")
352 print(
"OtherAlgs to bookkeep (but not directly used by the Stream):")
354 print(
"Master prescale:")
357 print(
"Not available for bytestream")
362 """This class is used to help the AugmentedRootStream class in handling
363 filter algorithms in the same way as they behaved with the 'old' way of
364 setting up the D3PD::MakerAlg algorithm.
372 self.
stream.AddRequireAlgs( alg.getName() )
376 """This class is used to handle output ROOT (D3PD) streams in Athena.
377 It inherits from the AugmentedStreamBase class, so implements its
378 interface, but at the same time it behaves for all intents and
379 purposes like a configurable for the D3PD::MakerAlg class.
381 def __init__( self, StreamName, FileName, TreeName = None, asAlg = False ):
382 """Constructor for the D3PD stream object.
385 StreamName: Logical name of the D3PD stream. Note that beside
386 using it to define the stream in THistSvc, this
387 name is also used as the name of the TTree in the
388 output file in case one is not specified explicitly.
389 FileName: Name of the file to write the D3PD TTree into.
390 TreeName: Name of the TTree in the output file. If it's not
391 specified, the stream name is used as the tree name.
392 asAlg: If set to True, the D3PD::MakerAlg algorithm is added
393 to the job as a regular algorithm. When set to False
394 (default), the D3PD algorithm is added to the application
395 manager as an output stream.
398 AugmentedStreamBase.__init__( self, StreamName )
402 TreeName = StreamName
409 from AthenaCommon.AlgSequence
import AlgSequence
413 from D3PDMakerConfig.D3PDMakerFlags
import D3PDMakerFlags
414 preseq =
AlgSequence( D3PDMakerFlags.PreD3PDAlgSeqName(),
415 StopOverride =
True )
416 if not hasattr( topSequence, D3PDMakerFlags.PreD3PDAlgSeqName() ):
417 topSequence += [ preseq ]
423 ParentStreamName = StreamName.split(
':' )[ 0 ]
424 if StreamName.count(
':' ) != 0:
425 if StreamName.count(
':' ) == 1:
426 StreamName = StreamName.split(
':' )[ 1 ]
428 raise AttributeError(
"Stream name '%s' can't be used!" % StreamName )
429 if not hasattr( topSequence, ParentStreamName +
"AANTStream" ):
431 from AnalysisTools.AnalysisToolsConf
import AANTupleStream
433 ExtraRefNames = [
'StreamRDO',
437 OutputName = FileName,
438 WriteInputDataHeader =
True,
439 StreamName = ParentStreamName )
442 print(self.
Name,
": INFO didn't find AnalysisTools.AnalysisToolsConf in release.")
444 print(traceback.format_exc())
448 from AthenaCommon.AppMgr
import ServiceMgr
449 if not hasattr( ServiceMgr,
'THistSvc' ):
450 from GaudiSvc.GaudiSvcConf
import THistSvc
455 for s
in ServiceMgr.THistSvc.Output:
456 stream = s.split()[ 0 ]
457 if stream == StreamName:
463 ServiceMgr.THistSvc.Output += [
"%s DATAFILE='%s' OPT='RECREATE' CL='%i'" %
464 ( StreamName, FileName,
465 D3PDMakerFlags.CompressionLevel() ) ]
477 import D3PDMakerCoreComps
483 self.
Stream = D3PDMakerCoreComps.MakerAlg( StreamName +
"D3PDMaker", seq = theseq,
484 file = FileName, stream = ParentStreamName,
485 tuplename = TreeName,
486 D3PDSvc =
"D3PD::RootD3PDSvc" )
489 from AthenaCommon.AppMgr
import theApp
490 theApp.addOutputStream( self.
Stream )
496 print(self.
Name,
": INFO didn't find D3PDMakerCoreComps in release.")
502 """It's not impossible to change the file name after the stream has been created,
503 but I didn't want to write this code unless there's actual need for it.
505 print(
"**** ERROR: Can't change the name of the output ROOT file! ****")
517 print(
"**** AugmentedRootStream",self.
Name,
"****")
518 print(
"Output file:")
524 print(
"RequireAlgs:")
528 print(
"OtherAlgs to bookkeep (but not directly used by the Stream):")
530 print(
"Master prescale:")
533 print(
" Not available for ROOT (D3PD) stream")
537 """This function makes it possible to add D3PDObject-s to this object in the same
538 way as they are added to D3PDMakerCoreComps.MakerAlg objects..__setattr__( name, value )
544 """This function forwards attribute requests which don't exist in this object to
545 the D3PDMakerCoreComps.MakerAlg object.
547 if 'Stream' in self.__dict__
and hasattr( self.
Stream, attr ):
548 return getattr( self.
Stream, attr )
549 raise AttributeError(
"D3PD Maker algorithm doesn't have property '%s'." % attr )
553 """This function forwards attribute setting requests to the D3PDMakerCoreComps.MakerAlg
556 if hasattr( self,
"Stream" ):
557 if hasattr( self.
Stream, name ):
562 self.__dict__[ name ] = value
567 """This class helps managing multiple streams.
568 Normal users only manipulate their own streams with functions like NewStream or GetStream,
569 while commands like Lock, AddItemToAllStreams or RenameAllStreams are for the real manager
570 (e.g. RecExCommon or a job transform class)"""
579 if FileName==
"default":
581 return self.
NewStream(StreamName,FileName,type=
'bytestream')
583 def NewPoolStream(self,StreamName,FileName="default", asAlg=False, noTag=False):
584 return self.
NewStream(StreamName,FileName,type=
'pool',asAlg=asAlg, noTag=noTag)
587 theStream = self.
NewStream(StreamName,FileName,type=
'pool',asAlg=asAlg)
588 from AthenaCommon.AppMgr
import theApp
589 from AthenaPoolCnvSvc
import PoolAttributeHelper
as pah
590 svcMgr = theApp.serviceMgr()
592 theStream.Stream.WritingTool.SubLevelBranchName =
"<key>"
594 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setFileCompAlg( FileName,
"5" ) ]
595 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setFileCompLvl( FileName,
"5" ) ]
597 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setMaxBufferSize( FileName,
"131072" ) ]
598 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setMinBufferEntries( FileName,
"10" ) ]
601 TREE_AUTO_FLUSH = -20000000
602 from PyUtils.moduleExists
import moduleExists
603 if moduleExists (
'AthenaMP'):
604 from AthenaMP.AthenaMPFlags
import jobproperties
as amjp
605 if amjp.AthenaMPFlags.UseSharedWriter()
and amjp.AthenaMPFlags.UseParallelCompression():
606 TREE_AUTO_FLUSH = 100
if "DAOD_" in StreamName
else 10
608 CONTAINER_SPLITLEVEL = 0
609 if StreamName
in [
"StreamDAOD_PHYSVAL"]:
610 TREE_AUTO_FLUSH = 100
611 if StreamName
in [
"StreamDAOD_PHYS"]:
612 TREE_AUTO_FLUSH = 500
613 if StreamName
in [
"StreamDAOD_PHYSLITE",
"StreamD2AOD_PHYSLITE"]:
614 TREE_AUTO_FLUSH = 1000
615 CONTAINER_SPLITLEVEL = 1
616 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setTreeAutoFlush( FileName,
"CollectionTree",
str(TREE_AUTO_FLUSH) ) ]
617 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName,
"CollectionTree",
str(CONTAINER_SPLITLEVEL) ) ]
618 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName,
"Aux.",
str(CONTAINER_SPLITLEVEL) ) ]
619 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName,
"Dyn.",
"1" ) ]
628 index=MSMgr.StreamDict[Parent]
632 FileName=self.
StreamList[index].Stream.OutputFile
633 self.
StreamList[index].Stream.WritingTool.SaveDecisions =
True
634 theStream = self.
NewStream(StreamName,FileName,Parent=Parent,type=
'extension',asAlg=asAlg, noTag=
True)
635 theStream.Stream.MetadataItemList = [ ]
636 theStream.Stream.ItemList = [ ]
637 theStream.Stream.WritingTool.SubLevelBranchName =
"<key>"
638 theStream.Stream.WritingTool.OutputCollection =
"POOLContainer_" + StreamName
639 theStream.Stream.WritingTool.PoolContainerPrefix =
"CollectionTree_" + StreamName
640 from AthenaCommon.AppMgr
import theApp
641 svcMgr = theApp.serviceMgr()
642 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [
"DatabaseName = '" + FileName +
"'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
644 from AthenaPoolCnvSvc
import PoolAttributeHelper
as pah
645 CONTAINER_SPLITLEVEL = 0
646 if Parent
in [
"StreamDAOD_PHYSVAL"]:
647 TREE_AUTO_FLUSH = 100
648 if Parent
in [
"StreamDAOD_PHYS"]:
649 TREE_AUTO_FLUSH = 500
650 if Parent
in [
"StreamDAOD_PHYSLITE",
"StreamD2AOD_PHYSLITE"]:
651 TREE_AUTO_FLUSH = 1000
652 CONTAINER_SPLITLEVEL = 1
653 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setTreeAutoFlush( FileName,
"CollectionTree_" + StreamName,
str(TREE_AUTO_FLUSH) ) ]
654 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName,
"CollectionTree_" + StreamName,
str(CONTAINER_SPLITLEVEL) ) ]
655 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName,
"Aux.",
str(CONTAINER_SPLITLEVEL) ) ]
656 svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName,
"Dyn.",
"1" ) ]
660 return self.
NewStream(StreamName,FileName,type=
'virtual',asAlg=asAlg)
663 """Function used to create a new D3PD 'stream'. The object returned by
664 it behaves both as an AugmentedStreamBase, and as a D3PD::MakerAlg
668 StreamName: Logical name of the D3PD stream. Used also as the D3PD
669 TTree name in case a tree name is not specified.
670 FileName: Name of the output file into which the D3PD should be
671 written. If not specified, '<StreamName>.root' is used.
672 TreeName: Name of the TTree in the created file. If not specified,
673 StreamName is used for the TTree name as well.
674 asAlg: Selects whether the 'stream' should be configured as an
675 actual stream, or an algorithm.
679 FileName = StreamName +
".root"
681 return self.
NewStream( StreamName, FileName, type=
'root', asAlg = asAlg,
682 TreeName = TreeName )
684 def NewStream(self,StreamName,FileName="default",type='pool',asAlg=False,TreeName=None,Parent=None,noTag=False):
685 if FileName==
"default":
686 FileName=StreamName+
".pool.root"
696 elif type==
'bytestream':
698 elif type==
'virtual':
702 elif type==
'extension':
705 raise RuntimeError(
"Unknown type '%s'"%type)
711 raise NameError(
"Stream %s already exists"%StreamName)
716 if isinstance(NameOrIndex, int):
720 raise IndexError(
"ERROR: No stream with index %i is defined in MultipleStreamManager."%NameOrIndex)
727 raise NameError(
"Stream %s undefined!"%NameOrIndex)
735 print(
"**** MultipleStreamManager INFOS ****")
739 print(
"----------------------- Stream #",i,
" -----------------------")
755 raise AssertionError(
"MSMgr is locked. AddItemToAllStreams cannot be used.")
762 raise AssertionError(
"MSMgr is locked. RemoveItemFromAllStreams cannot be used.")
764 Stream.RemoveItem(item)
769 raise AssertionError(
"MSMgr is locked. AddMetaDataItemToAllStreams cannot be used.")
771 Stream.AddMetaDataItem(item)
776 raise AssertionError(
"MSMgr is locked. AddMetaDataItemFromAllStreams cannot be used.")
778 Stream.RemoveMetaDataItem(item)
783 raise AssertionError(
"MSMgr is locked. RenameAllStreams cannot be used.")
784 if not isinstance(NameList, list):
785 raise TypeError(
"RenameAllStreams does not accep arguments of type %s"%
type(NameList))
786 if len(NameList) != self.
nStream:
787 raise IndexError(
"NameList needs to have the same length as self.StreamList.")
791 self.
StreamList[i].SetOutputFileName(NameList[i])
797 raise AssertionError(
"MSMgr is locked. WriteSkimDecisionsOfAllStreams cannot be used.")
799 from AthenaCommon.AlgSequence
import AlgSequence
802 if Stream.GetAcceptAlgs()
or Stream.GetOtherAlgsToBookkeep()
or Stream.GetRequireAlgs()
or Stream.GetVetoAlgs():
803 sdw=Stream.GetSkimDecisionsWriter()
805 if isinstance(Stream,AugmentedPoolStream):
806 Stream.AddItem(
"SkimDecisionCollection#"+sdw.SkimDecisionsContainerName)
813 if 'MSMgr' in vars():
814 raise RuntimeError(
"MSMgr already exists?!? This will almost certainly create erroneous results.")