ATLAS Offline Software
PATTransformUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 
3 
4 
5 # Get the base logger for the transforms and extend it for us
6 from PyJobTransforms.trfLogger import msg
7 msg = msg.getChild(__name__)
8 
9 import PyJobTransforms.trfArgClasses as trfArgClasses
10 
11 from PyJobTransforms.trfArgs import getExtraDPDList
12 from PyJobTransforms.trfExe import NTUPMergeExecutor, POOLMergeExecutor, NtupPhysValPostProcessingExecutor
13 
15  # TODO: Better to somehow auto-import this from PhysicsAnalysis/PhysicsValidation/PhysValMonitoring
16  # Use arggroup to get these arguments in their own sub-section (of --help)
17  parser.defineArgGroup('PhysValMerge', 'Physics Validation merge job specific options')
18  parser.add_argument('--inputNTUP_PHYSVALFile',
20  help='Input physics validation file', group='PhysValMerge', nargs='+')
21  parser.add_argument('--outputNTUP_PHYSVAL_MRGFile',
23  help='Output merged physics validation file', group='PhysValMerge')
24  parser.add_argument('--skipPostProcessing',
25  action='store_true',
26  default = False,
27  help='If given, skip the post-processing step and just do the merging',
28  group='PhysValMerge')
29 
30 def addNTUPMergeSubsteps(executorSet, skip_post_processing=False):
31  # Ye olde NTUPs
32  intermediateStep = 'NTUP_PHYSVAL_MRG0'
33  try:
34  if skip_post_processing:
35  msg.info("User requested to SKIP post-processing ('--skipPostProcessing' [%s]), so we'll just do merging and skip running post-processing.", skip_post_processing)
36  out_data = ['NTUP_PHYSVAL_MRG']
37  else:
38  msg.info("We'll run merging and post-processing (implemented for ID track monitoring, EGamma, and BTagging).")
39  out_data = [intermediateStep]
40 
41  executorSet.add(NTUPMergeExecutor(name='NTUPLEMergePHYSVAL', exe='hadd', inData=['NTUP_PHYSVAL'], outData=out_data, exeArgs=[]))
42 
43  if not skip_post_processing:
44  executorSet.add(NTUPMergeExecutor(name='NTUPLEPHYSVALIDTrackingPostProc', exe='postProcessIDPVMHistos', inData=[intermediateStep], outData=['NTUP_PHYSVAL_MRG1'], exeArgs=[]))
45  executorSet.add(NtupPhysValPostProcessingExecutor(name='NTUPLEPHYSVALPostProc', exe='physvalPostProcessing.py', inData=['NTUP_PHYSVAL_MRG1'], outData=['NTUP_PHYSVAL_MRG'],exeArgs=[]))
46 
47  # Extra Tier-0 NTUPs
48  extraNTUPs = getExtraDPDList(NTUPOnly = True)
49  for ntup in extraNTUPs:
50  executorSet.add(NTUPMergeExecutor(name='NTUPLEMerge'+ntup.name.replace('_',''), exe ='hadd', inData=[ntup.name], outData=[ntup.name+'_MRG'], exeArgs=[]))
51  except ImportError as e:
52  msg.warning("Failed to get D3PD lists - probably D3PDs are broken in this release: {0}".format(e))
53 
54 
55 
56 def addDAODArguments(parser, mergerTrf=True):
57  DAODTypes = knownDAODTypes()
58  if mergerTrf:
59  parser.defineArgGroup('Input DAOD', 'Input DAOD files to be merged')
60  parser.defineArgGroup('Output DAOD', 'Output merged DAOD files')
61  for DAOD in DAODTypes:
62  parser.add_argument("--input" + DAOD + "File", nargs="+",
63  type=trfArgClasses.argFactory(trfArgClasses.argPOOLFile, io="input", type="AOD", subtype=DAOD),
64  help="Input DAOD file of " + DAOD + " derivation", group="Input DAOD")
65  parser.add_argument("--output" + DAOD + "_MRGFile",
66  type=trfArgClasses.argFactory(trfArgClasses.argPOOLFile, io="output", type="AOD", subtype=DAOD),
67  help="Output merged DAOD file of " + DAOD + " derivation", group="Output DAOD")
68  else:
69  parser.defineArgGroup('Output DAOD', 'Output derivation DAOD files')
70  for DAOD in DAODTypes:
71  parser.add_argument("--output" + DAOD + "File",
72  type=trfArgClasses.argFactory(trfArgClasses.argPOOLFile, io="output", type="AOD", subtype=DAOD),
73  help="Output DAOD file of " + DAOD + " derivation", group="Output DAOD")
74 
75 
76 def addDAODMergerSubsteps(executorSet):
77  DAODTypes = knownDAODTypes()
78  for DAOD in DAODTypes:
79  executorSet.add(POOLMergeExecutor(name = DAOD.removeprefix("DAOD_") + 'Merge', inData = [DAOD], outData = [DAOD+'_MRG']))
80 
82  DAODTypes = []
83  try:
84  from DerivationFrameworkCore.DerivationFrameworkProdFlags import listAODtoDPD
85  DAODTypes = [ name.lstrip("Stream") for name in listAODtoDPD ]
86  except ImportError:
87  msg.warning("Could not import DAOD subtypes from DerivationFramework.DerivationFrameworkCore")
88  return DAODTypes
python.PATTransformUtils.addDAODArguments
def addDAODArguments(parser, mergerTrf=True)
Import list of known DAODs from the derivation framework and.
Definition: PATTransformUtils.py:56
python.trfArgClasses.argPOOLFile
POOL file class.
Definition: trfArgClasses.py:1440
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfArgClasses.argNTUPFile
NTUP (plain ROOT) file class.
Definition: trfArgClasses.py:1694
python.trfArgs.getExtraDPDList
def getExtraDPDList(NTUPOnly=False)
Definition: trfArgs.py:402
PyJobTransforms.trfArgClasses
Transform argument class definitions.
python.trfArgClasses.argFactory
Factory class used to generate argument class instances for argparse.
Definition: trfArgClasses.py:31
python.PATTransformUtils.addNTUPMergeSubsteps
def addNTUPMergeSubsteps(executorSet, skip_post_processing=False)
Definition: PATTransformUtils.py:30
python.PATTransformUtils.addPhysValidationMergeFiles
def addPhysValidationMergeFiles(parser)
Definition: PATTransformUtils.py:14
PyJobTransforms.trfExe
Transform execution functions.
PyJobTransforms.trfLogger
Logging configuration for ATLAS job transforms.
python.PATTransformUtils.knownDAODTypes
def knownDAODTypes()
Definition: PATTransformUtils.py:81
python.PATTransformUtils.addDAODMergerSubsteps
def addDAODMergerSubsteps(executorSet)
Definition: PATTransformUtils.py:76