ATLAS Offline Software
Public Member Functions | Public Attributes | Private Attributes | List of all members
python.MultipleStreamManager.MultipleStreamManager Class Reference
Collaboration diagram for python.MultipleStreamManager.MultipleStreamManager:

Public Member Functions

def __init__ (self)
 
def NewByteStream (self, StreamName, FileName="default")
 
def NewPoolStream (self, StreamName, FileName="default", asAlg=False, noTag=False)
 
def NewPoolRootStream (self, StreamName, FileName="default", asAlg=False)
 
def NewPoolRootStreamExtension (self, StreamName, Parent="StreamDAOD_PHYS", asAlg=False)
 
def NewVirtualStream (self, StreamName, FileName="default", asAlg=False)
 
def NewRootStream (self, StreamName, FileName=None, TreeName=None, asAlg=False)
 
def NewStream (self, StreamName, FileName="default", type='pool', asAlg=False, TreeName=None, Parent=None, noTag=False)
 
def GetStream (self, NameOrIndex)
 
def StreamExists (self, StreamName)
 
def Print (self)
 
def Lock (self)
 
def Unlock (self)
 
def AddItemToAllStreams (self, item)
 
def RemoveItemFromAllStreams (self, item)
 
def AddMetaDataItemToAllStreams (self, item)
 
def RemoveMetaDataItemFromAllStreams (self, item)
 
def RenameAllStreams (self, NameList)
 
def WriteSkimDecisionsOfAllStreams (self)
 

Public Attributes

 StreamList
 
 nStream
 
 StreamDict
 

Private Attributes

 _Locked
 

Detailed Description

This class helps managing multiple streams.
Normal users only manipulate their own streams with functions like NewStream or GetStream,
while commands like Lock, AddItemToAllStreams or RenameAllStreams are for the real manager
(e.g. RecExCommon or a job transform class)

Definition at line 566 of file MultipleStreamManager.py.

Constructor & Destructor Documentation

◆ __init__()

def python.MultipleStreamManager.MultipleStreamManager.__init__ (   self)

Definition at line 571 of file MultipleStreamManager.py.

571  def __init__(self):
572  self.StreamList=[]
573  self.nStream=0
574  self.StreamDict={}
575  self._Locked=False
576  return
577 

Member Function Documentation

◆ AddItemToAllStreams()

def python.MultipleStreamManager.MultipleStreamManager.AddItemToAllStreams (   self,
  item 
)

Definition at line 753 of file MultipleStreamManager.py.

753  def AddItemToAllStreams(self, item):
754  if self._Locked is True:
755  raise AssertionError("MSMgr is locked. AddItemToAllStreams cannot be used.")
756  for Stream in self.StreamList:
757  Stream.AddItem(item)
758  return
759 

◆ AddMetaDataItemToAllStreams()

def python.MultipleStreamManager.MultipleStreamManager.AddMetaDataItemToAllStreams (   self,
  item 
)

Definition at line 767 of file MultipleStreamManager.py.

767  def AddMetaDataItemToAllStreams(self, item):
768  if self._Locked is True:
769  raise AssertionError("MSMgr is locked. AddMetaDataItemToAllStreams cannot be used.")
770  for Stream in self.StreamList:
771  Stream.AddMetaDataItem(item)
772  return
773 

◆ GetStream()

def python.MultipleStreamManager.MultipleStreamManager.GetStream (   self,
  NameOrIndex 
)

Definition at line 714 of file MultipleStreamManager.py.

714  def GetStream(self, NameOrIndex):
715  #If NameOrIndex is an int, treat it as an index
716  if isinstance(NameOrIndex, int):
717  if NameOrIndex < self.nStream:
718  return self.StreamList[NameOrIndex]
719  else:
720  raise IndexError("ERROR: No stream with index %i is defined in MultipleStreamManager."%NameOrIndex)
721 
722  #else treat NameOrIndex as a name in the Stream Dictionary
723  try:
724  #Check wheter a stream with the same name already exists
725  index=self.StreamDict[NameOrIndex]
726  except KeyError:
727  raise NameError("Stream %s undefined!"%NameOrIndex)
728 
729  return self.StreamList[index]
730 

◆ Lock()

def python.MultipleStreamManager.MultipleStreamManager.Lock (   self)

Definition at line 745 of file MultipleStreamManager.py.

745  def Lock(self):
746  self._Locked=True
747  return
748 

◆ NewByteStream()

def python.MultipleStreamManager.MultipleStreamManager.NewByteStream (   self,
  StreamName,
  FileName = "default" 
)

Definition at line 578 of file MultipleStreamManager.py.

578  def NewByteStream(self,StreamName,FileName="default"):
579  if FileName=="default":
580  FileName=StreamName
581  return self.NewStream(StreamName,FileName,type='bytestream')
582 

◆ NewPoolRootStream()

def python.MultipleStreamManager.MultipleStreamManager.NewPoolRootStream (   self,
  StreamName,
  FileName = "default",
  asAlg = False 
)

Definition at line 586 of file MultipleStreamManager.py.

586  def NewPoolRootStream(self,StreamName,FileName="default", asAlg=False):
587  theStream = self.NewStream(StreamName,FileName,type='pool',asAlg=asAlg)
588  from AthenaCommon.AppMgr import theApp
589  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
590  svcMgr = theApp.serviceMgr()
591 
592  theStream.Stream.WritingTool.SubLevelBranchName = "<key>"
593  # Use ZSTD w/ Level 5 for DAODs
594  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setFileCompAlg( FileName, "5" ) ]
595  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setFileCompLvl( FileName, "5" ) ]
596  # By default use a maximum basket buffer size of 128k and minimum buffer entries of 10
597  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setMaxBufferSize( FileName, "131072" ) ]
598  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setMinBufferEntries( FileName, "10" ) ]
599  # By default use 20 MB AutoFlush [or 100 (10) events for DAODs (everything else) for SharedWriter w/ parallel compression]
600  # for event data except for a number of select formats (see below)
601  TREE_AUTO_FLUSH = -20000000
602  from PyUtils.moduleExists import moduleExists
603  if moduleExists ('AthenaMP'): # AthenaMP not in AthAnalysis project
604  from AthenaMP.AthenaMPFlags import jobproperties as amjp
605  if amjp.AthenaMPFlags.UseSharedWriter() and amjp.AthenaMPFlags.UseParallelCompression():
606  TREE_AUTO_FLUSH = 100 if "DAOD_" in StreamName else 10
607  # By default use split-level 0 except for DAOD_PHYSLITE which is maximally split
608  CONTAINER_SPLITLEVEL = 0
609  if StreamName in ["StreamDAOD_PHYSVAL"]:
610  TREE_AUTO_FLUSH = 100
611  if StreamName in ["StreamDAOD_PHYS"]:
612  TREE_AUTO_FLUSH = 500
613  if StreamName in ["StreamDAOD_PHYSLITE", "StreamD2AOD_PHYSLITE"]:
614  TREE_AUTO_FLUSH = 1000
615  CONTAINER_SPLITLEVEL = 1
616  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setTreeAutoFlush( FileName, "CollectionTree", str(TREE_AUTO_FLUSH) ) ]
617  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName, "CollectionTree", str(CONTAINER_SPLITLEVEL) ) ]
618  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName, "Aux.", str(CONTAINER_SPLITLEVEL) ) ]
619  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName, "Dyn.", "1" ) ]
620  return theStream
621 
622 

◆ NewPoolRootStreamExtension()

def python.MultipleStreamManager.MultipleStreamManager.NewPoolRootStreamExtension (   self,
  StreamName,
  Parent = "StreamDAOD_PHYS",
  asAlg = False 
)

Definition at line 623 of file MultipleStreamManager.py.

623  def NewPoolRootStreamExtension(self,StreamName,Parent="StreamDAOD_PHYS", asAlg=False):
624  index=0
625  FileName="default"
626  try:
627  #Check wheter a StreamDAOD_PHYS stream exists
628  index=MSMgr.StreamDict[Parent]
629  except KeyError:
630  index=-1
631  if index >= 0:
632  FileName=self.StreamList[index].Stream.OutputFile
633  self.StreamList[index].Stream.WritingTool.SaveDecisions = True
634  theStream = self.NewStream(StreamName,FileName,Parent=Parent,type='extension',asAlg=asAlg, noTag=True)
635  theStream.Stream.MetadataItemList = [ ]
636  theStream.Stream.ItemList = [ ]
637  theStream.Stream.WritingTool.SubLevelBranchName = "<key>"
638  theStream.Stream.WritingTool.OutputCollection = "POOLContainer_" + StreamName
639  theStream.Stream.WritingTool.PoolContainerPrefix = "CollectionTree_" + StreamName
640  from AthenaCommon.AppMgr import theApp
641  svcMgr = theApp.serviceMgr()
642  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ "DatabaseName = '" + FileName + "'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
643  # By default use split-level 0 except for DAOD_PHYSLITE which is maximally split
644  from AthenaPoolCnvSvc import PoolAttributeHelper as pah
645  CONTAINER_SPLITLEVEL = 0
646  if Parent in ["StreamDAOD_PHYSVAL"]:
647  TREE_AUTO_FLUSH = 100
648  if Parent in ["StreamDAOD_PHYS"]:
649  TREE_AUTO_FLUSH = 500
650  if Parent in ["StreamDAOD_PHYSLITE", "StreamD2AOD_PHYSLITE"]:
651  TREE_AUTO_FLUSH = 1000
652  CONTAINER_SPLITLEVEL = 1
653  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setTreeAutoFlush( FileName, "CollectionTree_" + StreamName, str(TREE_AUTO_FLUSH) ) ]
654  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName, "CollectionTree_" + StreamName, str(CONTAINER_SPLITLEVEL) ) ]
655  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName, "Aux.", str(CONTAINER_SPLITLEVEL) ) ]
656  svcMgr.AthenaPoolCnvSvc.PoolAttributes += [ pah.setContainerSplitLevel( FileName, "Dyn.", "1" ) ]
657  return theStream
658 

◆ NewPoolStream()

def python.MultipleStreamManager.MultipleStreamManager.NewPoolStream (   self,
  StreamName,
  FileName = "default",
  asAlg = False,
  noTag = False 
)

Definition at line 583 of file MultipleStreamManager.py.

583  def NewPoolStream(self,StreamName,FileName="default", asAlg=False, noTag=False):
584  return self.NewStream(StreamName,FileName,type='pool',asAlg=asAlg, noTag=noTag)
585 

◆ NewRootStream()

def python.MultipleStreamManager.MultipleStreamManager.NewRootStream (   self,
  StreamName,
  FileName = None,
  TreeName = None,
  asAlg = False 
)
Function used to create a new D3PD 'stream'. The object returned by
   it behaves both as an AugmentedStreamBase, and as a D3PD::MakerAlg
   configurable.

   Arguments:
      StreamName: Logical name of the D3PD stream. Used also as the D3PD
          TTree name in case a tree name is not specified.
      FileName: Name of the output file into which the D3PD should be
        written. If not specified, '<StreamName>.root' is used.
      TreeName: Name of the TTree in the created file. If not specified,
        StreamName is used for the TTree name as well.
      asAlg: Selects whether the 'stream' should be configured as an
     actual stream, or an algorithm.

Definition at line 662 of file MultipleStreamManager.py.

662  def NewRootStream(self,StreamName,FileName=None,TreeName=None,asAlg=False):
663  """Function used to create a new D3PD 'stream'. The object returned by
664  it behaves both as an AugmentedStreamBase, and as a D3PD::MakerAlg
665  configurable.
666 
667  Arguments:
668  StreamName: Logical name of the D3PD stream. Used also as the D3PD
669  TTree name in case a tree name is not specified.
670  FileName: Name of the output file into which the D3PD should be
671  written. If not specified, '<StreamName>.root' is used.
672  TreeName: Name of the TTree in the created file. If not specified,
673  StreamName is used for the TTree name as well.
674  asAlg: Selects whether the 'stream' should be configured as an
675  actual stream, or an algorithm.
676  """
677  # Check if a file name was specified or not:
678  if FileName is None:
679  FileName = StreamName + ".root"
680  # Use the common function for creating the stream:
681  return self.NewStream( StreamName, FileName, type='root', asAlg = asAlg,
682  TreeName = TreeName )
683 

◆ NewStream()

def python.MultipleStreamManager.MultipleStreamManager.NewStream (   self,
  StreamName,
  FileName = "default",
  type = 'pool',
  asAlg = False,
  TreeName = None,
  Parent = None,
  noTag = False 
)

Definition at line 684 of file MultipleStreamManager.py.

684  def NewStream(self,StreamName,FileName="default",type='pool',asAlg=False,TreeName=None,Parent=None,noTag=False):
685  if FileName=="default":
686  FileName=StreamName+".pool.root"
687  try:
688  #Check wheter a stream with the same name already exists
689  index=self.StreamDict[StreamName]
690  except KeyError:
691  #The stream doesn't already exist. Register it and set it up.
692  #(This is expected, not actually an error.)
693  index=self.nStream
694  if type=='pool':
695  self.StreamList += [ AugmentedPoolStream(StreamName,FileName,asAlg,isVirtual=False,noTag=noTag) ]
696  elif type=='bytestream':
697  self.StreamList += [ AugmentedByteStream(StreamName,FileName) ]
698  elif type=='virtual':
699  self.StreamList += [ AugmentedPoolStream(StreamName,FileName,asAlg,isVirtual=True) ]
700  elif type=='root':
701  self.StreamList += [ AugmentedRootStream(StreamName,FileName,TreeName,asAlg) ]
702  elif type=='extension':
703  self.StreamList += [ AugmentedPoolStreamExtension(StreamName,FileName,Parent,asAlg,isVirtual=False,noTag=True) ]
704  else:
705  raise RuntimeError("Unknown type '%s'"%type)
706 
707  self.StreamDict[StreamName]=index
708  self.nStream+=1
709  else:
710  #This is the real error case...
711  raise NameError("Stream %s already exists"%StreamName)
712  return self.StreamList[index]
713 

◆ NewVirtualStream()

def python.MultipleStreamManager.MultipleStreamManager.NewVirtualStream (   self,
  StreamName,
  FileName = "default",
  asAlg = False 
)

Definition at line 659 of file MultipleStreamManager.py.

659  def NewVirtualStream(self,StreamName,FileName="default", asAlg=False):
660  return self.NewStream(StreamName,FileName,type='virtual',asAlg=asAlg)
661 

◆ Print()

def python.MultipleStreamManager.MultipleStreamManager.Print (   self)

Definition at line 734 of file MultipleStreamManager.py.

734  def Print(self):
735  print("**** MultipleStreamManager INFOS ****")
736  print("Number of streams:", self.nStream)
737  i=0
738  for Stream in self.StreamList:
739  print("----------------------- Stream #",i," -----------------------")
740  Stream.Print()
741  i+=1
742  return
743 

◆ RemoveItemFromAllStreams()

def python.MultipleStreamManager.MultipleStreamManager.RemoveItemFromAllStreams (   self,
  item 
)

Definition at line 760 of file MultipleStreamManager.py.

760  def RemoveItemFromAllStreams(self, item):
761  if self._Locked is True:
762  raise AssertionError("MSMgr is locked. RemoveItemFromAllStreams cannot be used.")
763  for Stream in self.StreamList:
764  Stream.RemoveItem(item)
765  return
766 

◆ RemoveMetaDataItemFromAllStreams()

def python.MultipleStreamManager.MultipleStreamManager.RemoveMetaDataItemFromAllStreams (   self,
  item 
)

Definition at line 774 of file MultipleStreamManager.py.

774  def RemoveMetaDataItemFromAllStreams(self, item):
775  if self._Locked is True:
776  raise AssertionError("MSMgr is locked. AddMetaDataItemFromAllStreams cannot be used.")
777  for Stream in self.StreamList:
778  Stream.RemoveMetaDataItem(item)
779  return
780 

◆ RenameAllStreams()

def python.MultipleStreamManager.MultipleStreamManager.RenameAllStreams (   self,
  NameList 
)

Definition at line 781 of file MultipleStreamManager.py.

781  def RenameAllStreams(self, NameList):
782  if self._Locked is True:
783  raise AssertionError("MSMgr is locked. RenameAllStreams cannot be used.")
784  if not isinstance(NameList, list):
785  raise TypeError("RenameAllStreams does not accep arguments of type %s"%type(NameList))
786  if len(NameList) != self.nStream:
787  raise IndexError("NameList needs to have the same length as self.StreamList.")
788 
789  i=0
790  while i<self.nStream:
791  self.StreamList[i].SetOutputFileName(NameList[i])
792  i+=1
793  return
794 

◆ StreamExists()

def python.MultipleStreamManager.MultipleStreamManager.StreamExists (   self,
  StreamName 
)

Definition at line 731 of file MultipleStreamManager.py.

731  def StreamExists(self, StreamName):
732  return StreamName in self.StreamDict
733 

◆ Unlock()

def python.MultipleStreamManager.MultipleStreamManager.Unlock (   self)

Definition at line 749 of file MultipleStreamManager.py.

749  def Unlock(self):
750  self._Locked=False
751  return
752 

◆ WriteSkimDecisionsOfAllStreams()

def python.MultipleStreamManager.MultipleStreamManager.WriteSkimDecisionsOfAllStreams (   self)

Definition at line 795 of file MultipleStreamManager.py.

795  def WriteSkimDecisionsOfAllStreams(self):
796  if self._Locked:
797  raise AssertionError("MSMgr is locked. WriteSkimDecisionsOfAllStreams cannot be used.")
798 
799  from AthenaCommon.AlgSequence import AlgSequence
800  topSequence = AlgSequence()
801  for Stream in self.StreamList:
802  if Stream.GetAcceptAlgs() or Stream.GetOtherAlgsToBookkeep() or Stream.GetRequireAlgs() or Stream.GetVetoAlgs():
803  sdw=Stream.GetSkimDecisionsWriter()
804  topSequence+=sdw
805  if isinstance(Stream,AugmentedPoolStream):
806  Stream.AddItem("SkimDecisionCollection#"+sdw.SkimDecisionsContainerName)
807  return
808 
809 

Member Data Documentation

◆ _Locked

python.MultipleStreamManager.MultipleStreamManager._Locked
private

Definition at line 575 of file MultipleStreamManager.py.

◆ nStream

python.MultipleStreamManager.MultipleStreamManager.nStream

Definition at line 573 of file MultipleStreamManager.py.

◆ StreamDict

python.MultipleStreamManager.MultipleStreamManager.StreamDict

Definition at line 574 of file MultipleStreamManager.py.

◆ StreamList

python.MultipleStreamManager.MultipleStreamManager.StreamList

Definition at line 572 of file MultipleStreamManager.py.


The documentation for this class was generated from the following file:
python.AlgSequence.AlgSequence
AlgSequence
Definition: PhysicsAnalysis/D3PDTools/AnaAlgorithm/python/AlgSequence.py:7
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
dqm_persistency::Print
void Print(const PParameter *param, TDirectory *topdir, Option_t *opt="")
Definition: dqm_persistency_impl.cxx:161
str
Definition: BTagTrackIpAccessor.cxx:11