16def fromRunArgs(runArgs):
17 from AthenaCommon.Logging import logging
18 logDerivation = logging.getLogger('Derivation')
19 logDerivation.info('****************** STARTING DERIVATION *****************')
20
21 logDerivation.info('**** Transformation run arguments')
22 logDerivation.info(str(runArgs))
23
24 logDerivation.info('**** Setting-up configuration flags')
25 from AthenaConfiguration.AllConfigFlags import initConfigFlags
26 flags = initConfigFlags()
27 flags.Exec.EventPrintoutInterval = 100
28 commonRunArgsToFlags(runArgs, flags)
29
30 flags.Common.ProductionStep = ProductionStep.Derivation
31
32
33 from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
34 setPerfmonFlagsFromRunArgs(flags, runArgs)
35
36
37 allowedInputTypes = [ 'AOD', 'DAOD_PHYS', 'DAOD_PHYSLITE', 'EVNT' ]
38 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in allowedInputTypes ]
39 if sum(availableInputTypes) != 1:
40 raise ValueError(
'Input must be exactly one of the following types: '+
','.join(
map(str, availableInputTypes)))
41 idx = availableInputTypes.index(True)
42 flags.Input.Files = getattr(runArgs, f'input{allowedInputTypes[idx]}File')
43
44
45
46
47 if hasattr(runArgs, 'augmentations'):
48 for val in runArgs.augmentations:
49 if ':' not in val or len(val.split(':')) != 2:
50 logDerivation.error('Derivation job started, but with wrong augmentation syntax - aborting')
51 raise ValueError('Invalid augmentation argument: {0}'.format(val))
52 else:
53 child, parent = val.split(':')
54 flags.addFlag(f'Output.DAOD_{child}ParentStream',f'DAOD_{parent}')
55 childStreamFlag = f'Output.DAOD_{parent}ChildStream'
56 if not flags.hasFlag(childStreamFlag):
57 flags.addFlag(childStreamFlag, [f'DAOD_{child}'])
58 else:
59 flags._set(childStreamFlag, flags._get(childStreamFlag) + [f'DAOD_{child}'])
60 logDerivation.info('Setting up event augmentation as {0} => {1}'.format(child, parent))
61
62
63 formats = []
64 if hasattr(runArgs, 'formats'):
65 formats = runArgs.formats
66 logDerivation.info('Will attempt to make the following derived formats: {0}'.format(formats))
67 else:
68 logDerivation.error('Derivation job started, but with no output formats specified - aborting')
69 raise ValueError('No derived formats specified')
70
71
72 if hasattr(runArgs, 'skimmingExpression'):
73 if runArgs.skimmingExpression:
74 if not (len(formats)==1 and formats[0]=='SKIM'):
75 raise ValueError('Command-line skimming only available with SKIM format')
76 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in [ 'DAOD_PHYS', 'DAOD_PHYSLITE' ] ]
77 if sum(availableInputTypes) != 1:
78 raise ValueError(
'Command-line skimming only available with input types '+
','.join(
map(str, availableInputTypes)))
79 flags.Derivation.skimmingExpression = runArgs.skimmingExpression
80 if not hasattr(runArgs, 'skimmingContainers'):
81 logDerivation.warning('All containers used for skimming must be listed with the skimmingContainers option - job likely to fail')
82 else:
83 flags.Derivation.dynamicConsumers = runArgs.skimmingContainers
84
85
86 for runArg in dir(runArgs):
87 if 'output' in runArg and 'File' in runArg and 'Type' not in runArg and 'NTUP_PHYSVAL' not in runArg:
88 outputFileName = getattr(runArgs, runArg)
89 flagString = f'Output.{runArg.removeprefix("output")}Name'
90 flags.addFlag(flagString, outputFileName)
91 flags.addFlag(f'Output.doWrite{runArg.removeprefix("output").removesuffix("File")}', True)
92 flags.Output.doWriteDAOD = True
93
94
95 from Campaigns.Utils import Campaign, campaign_runs
96 if flags.Input.isMC and flags.Input.MCCampaign is Campaign.Unknown:
97 if flags.Input.RunNumbers:
98 mc_campaign = campaign_runs.get(flags.Input.RunNumbers[0], Campaign.Unknown)
99
100 if mc_campaign is not Campaign.Unknown:
101 flags.Input.MCCampaign = mc_campaign
102 logDerivation.info('Will recover MC campaign to: %s', mc_campaign.value)
103
104
105 if allowedInputTypes[idx] == 'EVNT':
106
107 if hasattr(runArgs, 'runNumber') and flags.Input.MCChannelNumber != runArgs.runNumber:
108 logDerivation.warning('Got different MC channel number (%d) from runNumber than from metadata (%d)', runArgs.runNumber, flags.Input.MCChannelNumber)
109 flags.Input.MCChannelNumber = runArgs.runNumber
110 elif flags.Input.MCChannelNumber == 0 and flags.Input.RunNumbers and flags.Input.RunNumbers[0] != 0:
111 logDerivation.info('Will recover MC channel number to: %d', flags.Input.RunNumbers[0])
112 flags.Input.MCChannelNumber = flags.Input.RunNumbers[0]
113 else:
114 logDerivation.info('MC channel number: %d', flags.Input.MCChannelNumber)
115
116
117 processPreInclude(runArgs, flags)
118
119
120 processPreExec(runArgs, flags)
121
122
123 flags.fillFromArgs()
124
125
126 flags.lock()
127
128
129 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
130 cfg = MainServicesCfg(flags)
131
132
133 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
134 cfg.merge(PoolReadCfg(flags))
135
136
137 if (allowedInputTypes[idx]=='EVNT'):
138 from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
139 cfg.merge(MetaDataSvcCfg(flags, ['IOVDbMetaDataTool']))
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 def premerge (cfg, newcfg, seqnam):
158
159 seq = cfg.getSequence(seqnam)
160 if not seq: return
161 newseq = newcfg.getSequence(seqnam)
162 if not newseq: return
163
164
165
166 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
167 ca = ComponentAccumulator()
168 ca.addSequence (CompFactory.AthSequencer (seqnam, Sequential=True))
169
170
171 for a in seq.Members:
172 ca.addEventAlgo (cfg.popEventAlgo (a.getName(), seqnam), seqnam)
173
174
175 cfg.getSequence('AthAlgSeq').Members.remove (seq)
176
177
178 newcfg.merge (ca)
179 return
180
181 for formatName in formats:
182 derivationConfig = getattr(DerivationConfigList, f'{formatName}Cfg')
183 newcfg = derivationConfig(flags)
184 premerge (cfg, newcfg, 'EventCleanSeq')
185 premerge (cfg, newcfg, 'EventCleanLockSeq')
186 cfg.merge(newcfg)
187
188
189 if hasattr(runArgs, 'passThrough'):
190 logDerivation.info('Pass-through mode was requested. All events will be written to the output.')
191 for algo in cfg.getEventAlgos():
192 if isinstance(algo, CompFactory.DerivationFramework.DerivationKernel):
193 algo.SkimmingTools = []
194
195
196 from PyUtils.AMITagHelperConfig import AMITagCfg
197 cfg.merge(AMITagCfg(flags, runArgs, fixBroken=True))
198
199
200 if flags.Input.isMC:
201 from GeneratorConfig.Versioning import GeneratorVersioningFixCfg
202 cfg.merge(GeneratorVersioningFixCfg(flags))
203
204
205 if not flags.Input.isMC and flags.Input.DataYear > 0:
206 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
207 cfg.merge(TagInfoMgrCfg(flags, tagValuePairs={
208 "data_year": str(flags.Input.DataYear)
209 }))
210 else:
211 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
212 cfg.merge(TagInfoMgrCfg(flags))
213
214
215 processPostInclude(runArgs, flags, cfg)
216
217
218 processPostExec(runArgs, flags, cfg)
219
220
221 from AthenaConfiguration.Utils import setupLoggingLevels
222 setupLoggingLevels(flags, cfg)
223
224
225 sc = cfg.run()
226 sys.exit(not sc.isSuccess())