16def fromRunArgs(runArgs):
17 from AthenaCommon.Logging import logging
18 logDerivation = logging.getLogger('Derivation')
19 logDerivation.info('****************** STARTING DERIVATION *****************')
20
21 logDerivation.info('**** Transformation run arguments')
22 logDerivation.info(str(runArgs))
23
24 logDerivation.info('**** Setting-up configuration flags')
25 from AthenaConfiguration.AllConfigFlags import initConfigFlags
26 flags = initConfigFlags()
27 flags.Exec.EventPrintoutInterval = 100
28 commonRunArgsToFlags(runArgs, flags)
29
30 flags.Common.ProductionStep = ProductionStep.Derivation
31
32
33 from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
34 setPerfmonFlagsFromRunArgs(flags, runArgs)
35
36
37 allowedInputTypes = [ 'AOD', 'DAOD_PHYS', 'DAOD_PHYSLITE', 'EVNT' ]
38 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in allowedInputTypes ]
39 if sum(availableInputTypes) != 1:
40 raise ValueError(
'Input must be exactly one of the following types: '+
','.join(
map(str, availableInputTypes)))
41 idx = availableInputTypes.index(True)
42 flags.Input.Files = getattr(runArgs, f'input{allowedInputTypes[idx]}File')
43
44
45
46
47 if hasattr(runArgs, 'augmentations'):
48 for val in runArgs.augmentations:
49 if ':' not in val or len(val.split(':')) != 2:
50 logDerivation.error('Derivation job started, but with wrong augmentation syntax - aborting')
51 raise ValueError('Invalid augmentation argument: {0}'.format(val))
52 else:
53 child, parent = val.split(':')
54 flags.addFlag(f'Output.DAOD_{child}ParentStream',f'DAOD_{parent}')
55 childStreamFlag = f'Output.DAOD_{parent}ChildStream'
56 if not flags.hasFlag(childStreamFlag):
57 flags.addFlag(childStreamFlag, [f'DAOD_{child}'])
58 else:
59 flags._set(childStreamFlag, flags._get(childStreamFlag) + [f'DAOD_{child}'])
60 logDerivation.info('Setting up event augmentation as {0} => {1}'.format(child, parent))
61
62
63 formats = []
64 if hasattr(runArgs, 'formats'):
65 formats = runArgs.formats
66 logDerivation.info('Will attempt to make the following derived formats: {0}'.format(formats))
67 else:
68 logDerivation.error('Derivation job started, but with no output formats specified - aborting')
69 raise ValueError('No derived formats specified')
70
71
72 if hasattr(runArgs, 'skimmingExpression'):
73 if runArgs.skimmingExpression:
74 if not (len(formats)==1 and formats[0]=='SKIM'):
75 raise ValueError('Command-line skimming only available with SKIM format')
76 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in [ 'DAOD_PHYS', 'DAOD_PHYSLITE' ] ]
77 if sum(availableInputTypes) != 1:
78 raise ValueError(
'Command-line skimming only available with input types '+
','.join(
map(str, availableInputTypes)))
79 flags.Derivation.skimmingExpression = runArgs.skimmingExpression
80 if not hasattr(runArgs, 'skimmingContainers'):
81 logDerivation.warning('All containers used for skimming must be listed with the skimmingContainers option - job likely to fail')
82 else:
83 flags.Derivation.dynamicConsumers = runArgs.skimmingContainers
84
85
86 for runArg in dir(runArgs):
87 if 'output' in runArg and 'File' in runArg and 'Type' not in runArg and 'NTUP_PHYSVAL' not in runArg:
88 outputFileName = getattr(runArgs, runArg)
89 flagString = f'Output.{runArg.removeprefix("output")}Name'
90 flags.addFlag(flagString, outputFileName)
91 flags.addFlag(f'Output.doWrite{runArg.removeprefix("output").removesuffix("File")}', True)
92 flags.Output.doWriteDAOD = True
93
94
95 from Campaigns.Utils import Campaign, campaign_runs
96 if flags.Input.isMC and flags.Input.MCCampaign is Campaign.Unknown:
97 if flags.Input.RunNumbers:
98 mc_campaign = campaign_runs.get(flags.Input.RunNumbers[0], Campaign.Unknown)
99
100 if mc_campaign is not Campaign.Unknown:
101 flags.Input.MCCampaign = mc_campaign
102 logDerivation.info('Will recover MC campaign to: %s', mc_campaign.value)
103
104
105 processPreInclude(runArgs, flags)
106
107
108 processPreExec(runArgs, flags)
109
110
111 flags.fillFromArgs()
112
113
114 flags.lock()
115
116
117 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
118 cfg = MainServicesCfg(flags)
119
120
121 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
122 cfg.merge(PoolReadCfg(flags))
123
124
125 if (allowedInputTypes[idx]=='EVNT'):
126 from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
127 cfg.merge(MetaDataSvcCfg(flags, ['IOVDbMetaDataTool']))
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 def premerge (cfg, newcfg, seqnam):
146
147 seq = cfg.getSequence(seqnam)
148 if not seq: return
149 newseq = newcfg.getSequence(seqnam)
150 if not newseq: return
151
152
153
154 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
155 ca = ComponentAccumulator()
156 ca.addSequence (CompFactory.AthSequencer (seqnam, Sequential=True))
157
158
159 for a in seq.Members:
160 ca.addEventAlgo (cfg.popEventAlgo (a.getName(), seqnam), seqnam)
161
162
163 cfg.getSequence('AthAlgSeq').Members.remove (seq)
164
165
166 newcfg.merge (ca)
167 return
168
169 for formatName in formats:
170 derivationConfig = getattr(DerivationConfigList, f'{formatName}Cfg')
171 newcfg = derivationConfig(flags)
172 premerge (cfg, newcfg, 'EventCleanSeq')
173 premerge (cfg, newcfg, 'EventCleanLockSeq')
174 cfg.merge(newcfg)
175
176
177 if hasattr(runArgs, 'passThrough'):
178 logDerivation.info('Pass-through mode was requested. All events will be written to the output.')
179 for algo in cfg.getEventAlgos():
180 if isinstance(algo, CompFactory.DerivationFramework.DerivationKernel):
181 algo.SkimmingTools = []
182
183
184 from PyUtils.AMITagHelperConfig import AMITagCfg
185 cfg.merge(AMITagCfg(flags, runArgs, fixBroken=True))
186
187
188 if flags.Input.isMC:
189 from GeneratorConfig.Versioning import GeneratorVersioningFixCfg
190 cfg.merge(GeneratorVersioningFixCfg(flags))
191
192
193 if not flags.Input.isMC and flags.Input.DataYear > 0:
194 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
195 cfg.merge(TagInfoMgrCfg(flags, tagValuePairs={
196 "data_year": str(flags.Input.DataYear)
197 }))
198 else:
199 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
200 cfg.merge(TagInfoMgrCfg(flags))
201
202
203 processPostInclude(runArgs, flags, cfg)
204
205
206 processPostExec(runArgs, flags, cfg)
207
208
209 from AthenaConfiguration.Utils import setupLoggingLevels
210 setupLoggingLevels(flags, cfg)
211
212
213 sc = cfg.run()
214 sys.exit(not sc.isSuccess())