16def fromRunArgs(runArgs):
17 from AthenaCommon.Logging import logging
18 logDerivation = logging.getLogger('Derivation')
19 logDerivation.info('****************** STARTING DERIVATION *****************')
20
21 logDerivation.info('**** Transformation run arguments')
22 logDerivation.info(
str(runArgs))
23
24 logDerivation.info('**** Setting-up configuration flags')
25 from AthenaConfiguration.AllConfigFlags import initConfigFlags
26 flags = initConfigFlags()
27 flags.Exec.EventPrintoutInterval = 100
28 commonRunArgsToFlags(runArgs, flags)
29
30 flags.Common.ProductionStep = ProductionStep.Derivation
31
32
33 from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
34 setPerfmonFlagsFromRunArgs(flags, runArgs)
35
36
37 allowedInputTypes = [ 'AOD', 'DAOD_PHYS', 'DAOD_PHYSLITE', 'EVNT' ]
38 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in allowedInputTypes ]
39 if sum(availableInputTypes) != 1:
40 raise ValueError(
'Input must be exactly one of the following types: '+
','.join(
map(str, availableInputTypes)))
41 idx = availableInputTypes.index(True)
42 flags.Input.Files = getattr(runArgs, f'input{allowedInputTypes[idx]}File')
43
44
45
46
47 if hasattr(runArgs, 'augmentations'):
48 for val in runArgs.augmentations:
49 if ':' not in val or len(val.split(':')) != 2:
50 logDerivation.error('Derivation job started, but with wrong augmentation syntax - aborting')
51 raise ValueError('Invalid augmentation argument: {0}'.format(val))
52 else:
53 child, parent = val.split(':')
54 flags.addFlag(f'Output.DAOD_{child}ParentStream',f'DAOD_{parent}')
55 childStreamFlag = f'Output.DAOD_{parent}ChildStream'
56 if not flags.hasFlag(childStreamFlag):
57 flags.addFlag(childStreamFlag, [f'DAOD_{child}'])
58 else:
59 flags._set(childStreamFlag, flags._get(childStreamFlag) + [f'DAOD_{child}'])
60 logDerivation.info('Setting up event augmentation as {0} => {1}'.format(child, parent))
61
62
63 formats = []
64 if hasattr(runArgs, 'formats'):
65 formats = runArgs.formats
66 logDerivation.info('Will attempt to make the following derived formats: {0}'.format(formats))
67 else:
68 logDerivation.error('Derivation job started, but with no output formats specified - aborting')
69 raise ValueError('No derived formats specified')
70
71
72 if hasattr(runArgs, 'skimmingExpression'):
73 if runArgs.skimmingExpression:
74 if not (len(formats)==1 and formats[0]=='SKIM'):
75 raise ValueError('Command-line skimming only available with SKIM format')
76 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in [ 'DAOD_PHYS', 'DAOD_PHYSLITE' ] ]
77 if sum(availableInputTypes) != 1:
78 raise ValueError(
'Command-line skimming only available with input types '+
','.join(
map(str, availableInputTypes)))
79 flags.Derivation.skimmingExpression = runArgs.skimmingExpression
80 if not hasattr(runArgs, 'skimmingContainers'):
81 logDerivation.warning('All containers used for skimming must be listed with the skimmingContainers option - job likely to fail')
82 else:
83 flags.Derivation.dynamicConsumers = runArgs.skimmingContainers
84
85
86 for runArg in dir(runArgs):
87 if 'output' in runArg and 'File' in runArg and 'Type' not in runArg and 'NTUP_PHYSVAL' not in runArg:
88 outputFileName = getattr(runArgs, runArg)
89 flagString = f'Output.{runArg.removeprefix("output")}Name'
90 flags.addFlag(flagString, outputFileName)
91 flags.addFlag(f'Output.doWrite{runArg.removeprefix("output").removesuffix("File")}', True)
92 flags.Output.doWriteDAOD = True
93
94
95 from Campaigns.Utils import Campaign, campaign_runs
96 if flags.Input.isMC and flags.Input.MCCampaign is Campaign.Unknown:
97 if flags.Input.RunNumbers:
98 mc_campaign = campaign_runs.get(flags.Input.RunNumbers[0], Campaign.Unknown)
99
100 if mc_campaign is not Campaign.Unknown:
101 flags.Input.MCCampaign = mc_campaign
102 logDerivation.info('Will recover MC campaign to: %s', mc_campaign.value)
103
104
105 if allowedInputTypes[idx] == 'EVNT':
106
107 if hasattr(runArgs, 'runNumber') and flags.Input.MCChannelNumber != runArgs.runNumber:
108 logDerivation.warning('Got different MC channel number (%d) from runNumber than from metadata (%d)', runArgs.runNumber, flags.Input.MCChannelNumber)
109 flags.Input.MCChannelNumber = runArgs.runNumber
110 elif flags.Input.MCChannelNumber == 0 and flags.Input.RunNumbers and flags.Input.RunNumbers[0] != 0:
111 logDerivation.info('Will recover MC channel number to: %d', flags.Input.RunNumbers[0])
112 flags.Input.MCChannelNumber = flags.Input.RunNumbers[0]
113 else:
114 logDerivation.info('MC channel number: %d', flags.Input.MCChannelNumber)
115
116
117 processPreInclude(runArgs, flags)
118
119
120 processPreExec(runArgs, flags)
121
122
123 flags.fillFromArgs()
124
125
126 flags.lock()
127
128
129 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
130 cfg = MainServicesCfg(flags)
131
132
133 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
134 cfg.merge(PoolReadCfg(flags))
135
136
137 if (allowedInputTypes[idx]=='EVNT'):
138 from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
139 cfg.merge(MetaDataSvcCfg(flags, ['IOVDbMetaDataTool']))
140
141 for formatName in formats:
142 derivationConfig = getattr(DerivationConfigList, f'{formatName}Cfg')
143 newcfg = derivationConfig(flags)
144 cfg.merge(newcfg)
145
146
147 if hasattr(runArgs, 'passThrough'):
148 logDerivation.info('Pass-through mode was requested. All events will be written to the output.')
149 for algo in cfg.getEventAlgos():
150 if isinstance(algo, CompFactory.DerivationFramework.DerivationKernel):
151 algo.SkimmingTools = []
152
153
154 from PyUtils.AMITagHelperConfig import AMITagCfg
155 cfg.merge(AMITagCfg(flags, runArgs, fixBroken=True))
156
157
158 if flags.Input.isMC:
159 from GeneratorConfig.Versioning import GeneratorVersioningFixCfg
160 cfg.merge(GeneratorVersioningFixCfg(flags))
161
162
163 if not flags.Input.isMC and flags.Input.DataYear > 0:
164 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
165 cfg.merge(TagInfoMgrCfg(flags, tagValuePairs={
166 "data_year":
str(flags.Input.DataYear)
167 }))
168 else:
169 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
170 cfg.merge(TagInfoMgrCfg(flags))
171
172
173 processPostInclude(runArgs, flags, cfg)
174
175
176 processPostExec(runArgs, flags, cfg)
177
178
179 from AthenaConfiguration.Utils import setupLoggingLevels
180 setupLoggingLevels(flags, cfg)
181
182
183 sc = cfg.run()
184 sys.exit(not sc.isSuccess())