ATLAS Offline Software
Loading...
Searching...
No Matches
trfAMI.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
7
8
9
10import ast
11import json
12import os
13import traceback
14from json import dumps
15
16import logging
17msg = logging.getLogger(__name__)
18
19from PyJobTransforms.trfExceptions import TransformAMIException
20from PyJobTransforms.trfDefaultFiles import getInputFileName, getOutputFileName
21from PyJobTransforms.trfUtils import convertToStr
22
23from PyJobTransforms.trfExitCodes import trfExit
24AMIerrorCode=trfExit.nameToCode('TRF_AMI_ERROR')
25
26
27
29 def __init__(self):
30 self.name=None
31 self.release=None
32 self.physics={}
33 self.inFiles={}
34 self.outFiles={}
35 self.outputs={}
36 self.inDS=None
37 self.outfmts=[]
38 self.newTransform=False
39
40 def __iter__(self):
41 theDict=self.inFiles.copy()
42 theDict.update(self.outFiles)
43 theDict.update(self.physics)
44 for (k,v) in theDict.items():
45 yield k,v
46
47 def __str__(self):
48 string = 'asetup '+self.release+'\n'+self.name
49 string += self._argsToString(self.physics) +'\n'
50
51 string +='\nInput file arguments:\n'
52 if self.inFiles:
53 string += self._argsToString(self.inFiles) +'\n'
54 if self.inDS:
55 string +='\nExample input dataset: '+ self.inDS + '\n'
56
57 string +='\nOutput file arguments:\n'
58 if self.outFiles:
59 string += self._argsToString(self.outFiles) + '\n'
60
61 string +='\nAMI outputs:\n'
62 if self.outputs != {}:
63 string += self.outputs + '\n'
64
65 if self.outfmts:
66 string += '\nPossible output data types: '+ str(self.outfmts) + '\n'
67 return string
68
69 def _argsToString(self, adict):
70 string=''
71 for (k,v) in adict.items():
72 if self.newTransform:
73 if not k.startswith('--'):
74 k = "--"+k
75 # Now the tricky bit: we have to hackishly massage back to a CLI
76 # value depending on the value type and possibly the key
77 # type
78 if isinstance(v, dict):
79 # Should be a substep argument
80 if 'Exec' in k: # preExec, postExec
81 string += " " + k
82 for vk, vv in v.items():
83 string += " " + _parseExecDict(vk, vv)
84 elif 'Include' in k: # preInclude, postInclude
85 string += " " + k
86 for vk, vv in v.items():
87 string += " " + _parseIncludeDict(vk, vv)
88 else:
89 # Misc substep string/number argument...?
90 # Be careful with the triggerConfig, which is a bit special in the separator
91 if 'triggerConfig' in k:
92 separator='='
93 else:
94 separator=':'
95 string += " " + k
96 for vk, vv in v.items():
97 string += " " + vk + separator + str(vv)
98 elif isinstance(v, (list, tuple)):
99 # athenaopts are special - space separated
100 if "athenaopts" in k:
101 string += " " + k + "=" + "'" + " ".join(v).replace("'", "\\'") + "'"
102 elif "Exec" in k or "Include" in k:
103 # Special intermediate treatment for pre/postExec from prodsys
104 string += " " + k + " " + " ".join(["'"+element.replace("'", "\\'")+"'" for element in v])
105 else:
106 string += " " + k + "=" + " ".join(["'" + element.replace("'", "\\'") + "'" for element in v])
107 else:
108 # Assume some vanilla value
109 string +=" "+k+" "+"'"+str(v).replace("'", "\\'")+"'"
110 else:
111 string +=" "+k+"="+"'"+str(v).replace("'", "\\'")+"'"
112# string += '\n'
113 return string
114
115
117def _parseExecDict(substep, value):
118 string = ""
119 for bit in value:
120 string += " '" + substep + ":" + str(bit).replace("'", "\\'")+"'"
121 return string
122
123
126def _parseIncludeDict(substep, value, joinWithChar = ","):
127 string = "'"
128 string += substep + ":" + joinWithChar.join(value).replace("'", "\\'")+"'"
129 return string
130
131def isNewAMITag(tag):
132 newTagDict = {
133 'a' : 764,
134 'b' : 545,
135 'c' : 864,
136 'd' : 1351,
137 'e' : 3764,
138 'f' : 557,
139 'g' : 46,
140 'h' : 32,
141 'j' : 46,
142 'k' : 34,
143 'm' : 1377,
144 'o' : 4741,
145 'p' : 2295,
146 'q' : 430,
147 'r' : 6382,
148 's' : 2559,
149 't' : 597,
150 'u' : 51,
151 'v' : 139,
152 'w' : 501,
153 'x' : 302,
154 }
155
156 if tag[0] in newTagDict:
157 if int(tag[1:]) > newTagDict[tag[0]]:
158 msg.debug('it is a new tag')
159 return True
160
161 msg.debug('it is NOT a new tag')
162 return False
163
164
165
167 def __init__(self, tag, suppressNonJobOptions = True):
168 self._tag=tag
170 self._isProdSys=None
171 self._trfs=None
172 self._suppressNonJobOptions = suppressNonJobOptions
173
174 @property
175 def isProdSys(self):
176 if self._isProdSys is None:
177 if self._isNewTag:
178 #probably false, as we need to get stuff from ami
179 self._isProdSys = False
180 else:
181 prodtags=getProdSysTagsCharacters()
182 if self._tag[0] in prodtags:
183 self._isProdSys=True
184 else:
185 self._isProdSys=False
186 return self._isProdSys
187
188
189 @property
190 def trfs(self):
191 if self._trfs is None:
192 if self.isProdSys:
194 else:
196 return self._trfs
197
198
199 def __str__(self):
200 string = '\nInformation about tag '+self._tag+':\n'
201
202 if self.isProdSys:
203 string +='This is a ProdSys tag. Input and output file arguments are likely to be missing because they are often not part of the tag definition.\n'
204 else:
205 string +='This is a T0 tag.\n'
206
207 string +='This tag consists of ' + str(len(self.trfs)) + ' transform command(s).\n'
208 string += 'Transform commands follow below.\n'
209 string += 'Input and output file names (if present) are only suggestions.\n'
210
211 for trf in self.trfs:
212 string+='\n'+str(trf)+'\n'
213
214 return string
215
216
217
222def getAMIClient(endpoints = ['atlas-replica','atlas']):
223 msg.debug('Getting AMI client...')
224
225 try:
226 from pyAMI.client import Client
227 except ImportError:
228 raise TransformAMIException(AMIerrorCode, 'Import of pyAMI modules failed.')
229
230 msg.debug("Attempting to get AMI client for endpoints {0}".format(endpoints))
231 amiclient = Client(endpoints, ignore_proxy = True)
232 return amiclient
233
234
237 # Due to move to uniform tag definition in AMI this list is now frozen
238 # So just hard code it
239 msg.debug('Getting list of ProdSys tag characters...')
240
241 defaultList=['z', 'p', 'e', 's', 'd', 'r', 't', 'a', 'b', 'w']
242
243 return defaultList
244
245
246
249 msg.debug('Getting PANDA client...')
250 try:
251 import cx_Oracle
252 except ImportError:
253 raise TransformAMIException(AMIerrorCode, 'Import of cx_Oracle failed (is Oracle setup on this machine?).')
254
255 try:
256 cur = cx_Oracle.connect('atlas_grisli_r/panda_c10@adcr_panda').cursor()
257 except Exception:
258 msg.debug('An exception occurred while connecting to PANDA database: %s', traceback.format_exc())
259 raise TransformAMIException(AMIerrorCode, 'Failed to get PANDA client connection (N.B. this does not work from outside CERN).')
260
261 return cur
262
263
266 return s.replace('%0B',' ').replace('%9B','; ').replace('%8B','"').replace('%3B',';').replace('%2C',',').replace('%2B','+')
267
268
269
273
274 msg.debug('Using PANDA to get info about tag %s', tag)
275
276 try:
277 pandaclient=getPANDAClient()
278 pandaclient.execute("select trf,trfv,lparams,vparams,formats,cache from t_trf_config where tag='%s' and cid=%d" %(tag[:1],int(tag[1:]) ) )
279 result=pandaclient.fetchone()
280 except Exception:
281 msg.info('An exception occurred: %s', traceback.format_exc())
282 raise TransformAMIException(AMIerrorCode, 'Getting tag info from PANDA failed.')
283
284 if result is None:
285 raise TransformAMIException(AMIerrorCode, 'Tag %s not found in PANDA database' % tag)
286
287 msg.debug('Raw data returned from panda DB is:' + os.linesep + str(result))
288
289 trfn=result[0].split(',')
290 msg.debug('List of transforms: %s', trfn)
291 trfv=result[1].split(',')
292 msg.debug('List of releases: %s', trfv)
293 lparams=result[2].split(';')
294 msg.debug('List of arguments: %s', lparams)
295 vparams=result[3].split(';')
296 msg.debug('List of argument values: %s', vparams)
297 formats=result[4].split('.')
298 msg.debug('List of formats: %s', formats)
299 cache=result[5].split(',')
300 msg.debug('List of caches: %s', formats)
301
302
303 if not ( len(trfn) == len(trfv) == len(lparams) == len(vparams) ):
304 raise TransformAMIException(AMIerrorCode, 'Inconsistency in number of trfs.')
305
306 # Cache can be a single value, in which case it needs replicated for other
307 # transform steps, or it can be multivalued - great schema design guys :-(
308 if len(cache) != len(trfv):
309 if len(cache) == 1:
310 cache = cache * len(trfv)
311 else:
312 raise TransformAMIException(AMIerrorCode, 'Inconsistency in number of caches entries vs. release numbers ({0}; {1}).'.format(cache, trfv))
313
314 listOfTrfs=[]
315
316 for iTrf in range(len(trfn)):
317
318 trf = TrfConfig()
319 trf.name =trfn[iTrf]
320 if '_tf.py' in trf.name:
321 trf.newTransform=True
322 else:
323 trf.newTransform=False
324 trf.release=trfv[iTrf] + "," + cache[iTrf]
325
326 keys=lparams[iTrf].split(',')
327 values=vparams[iTrf].split(',')
328
329 if ( len(keys) != len(values) ):
330 raise TransformAMIException(AMIerrorCode, 'Inconsistency in number of arguments.')
331
332 physics = dict( (k, ReadablePANDA(v) ) for (k,v) in zip(keys, values))
333 # Hack to correct trigger keys being stored with spaces in panda
334 for k, v in physics.items():
335 if 'triggerConfig' in k or 'triggerConfigByRun' in k:
336 if ' ' in v:
337 physics[k] = v.replace(' ', ',')
338 msg.warning('Attempted to correct illegal trigger configuration string: {0} -> {1}'.format(v, physics[k]))
339 if 'Exec' in k:
340 # Mash up to a list, where %8C is used as the quote delimitation character
341 physics[k] = [ execElement.replace("%8C", "") for execElement in v.split("%8C %8C") ]
342
343 msg.debug("Checking for pseudo-argument internal to ProdSys...")
344 if 'extraParameter' in physics:
345 val=physics.pop('extraParameter')
346 msg.debug("Removed extraParamater=%s from arguments.", val)
347
348 msg.debug("Checking for input/output file arguments...")
349 for arg in list(physics):
350 if arg.lstrip('-').startswith('input') and arg.endswith('File'):
351 value=physics.pop(arg)
352 msg.debug("Found input file argument %s=%s.", arg, value )
353 trf.inFiles[arg]=value
354 elif arg.lstrip('-').startswith('output') and arg.endswith('File'):
355 value=physics.pop(arg)
356 msg.debug("Found output file argument %s=%s.", arg, value )
357 trf.outFiles[arg]=value
358
359 msg.debug("Checking for not set arguments...")
360 for arg,value in physics.items():
361 if value=="NONE" or value=="none" or value==["NONE"]:
362 val=physics.pop(arg)
363 msg.debug("Removed %s=%s from arguments.", arg, val )
364
365 trf.physics=physics
366
367 listOfTrfs.append(trf)
368
369
370
371 listOfTrfs[0].inDS=None # not yet implemented
372 listOfTrfs[-1].outfmts=formats
373
374 return listOfTrfs
375
376
377'''
378directly copied from pyAMI.atlas 5 API
379should be removed once suppressNonJobOptions is in official release
380'''
381def get_ami_tag(client, tag, suppressNonJobOptions = True):
382 '''Get AMI-tag information.
383
384 Args:
385 :client: the pyAMI client [ pyAMI.client.Client ]
386 :tag: the AMI-tag [ str ]
387
388 Returns:
389 an array of python dictionnaries.
390 '''
391
392 command = [
393 'AMIGetAMITagInfo',
394 '-amiTag="%s"' % tag,
395 '-cached',
396 ]
397
398 if suppressNonJobOptions:
399 command += ['-suppressNonJobOptions']
400
401 msg.debug(command)
402
403 return client.execute(command, format = 'dict_object').get_rows('amiTagInfo')
404
406 try:
407 if s[0] == s[-1] and s[0] in ('"', "'"):
408 s = s[1:-1]
409 except Exception:
410 pass
411 return s
412
413
416def getTrfConfigFromAMI(tag, suppressNonJobOptions = True):
417 msg.debug('Using AMI to get info about tag %s', tag)
418
419 try:
420# import pyAMI.atlas.api
421 import pyAMI.exception
422 except ImportError as e:
423 raise TransformAMIException(AMIerrorCode, 'Import of pyAMI modules failed ({0})'.format(e))
424
425 try:
426 amiclient=getAMIClient()
427# result = pyAMI.atlas.api.get_ami_tag(amiclient, tag)
428 result = get_ami_tag(amiclient, tag, suppressNonJobOptions)
429 except pyAMI.exception.Error as e:
430 msg.error('An exception occured when connecting to primary AMI: {0}'.format(e))
431 msg.debug('Exception: {0}'.format(e))
432 if 'please login' in str(e) or 'certificate expired' in str(e):
433 raise TransformAMIException(AMIerrorCode, 'Getting tag info from AMI failed with credential problem. '
434 'Please check your AMI account status.')
435 if 'Invalid amiTag' in str(e):
436 raise TransformAMIException(AMIerrorCode, 'Invalid AMI tag ({0}).'.format(tag))
437 raise TransformAMIException(AMIerrorCode, 'Getting tag info from AMI failed. See logfile for exception details.')
438
439 try:
440 trf = TrfConfig()
441 trf.name = result[0]['transformation']
442 trf.inputs=result[0].get('inputs', {})
443 trf.outputs=result[0].get('outputs', {})
444 trf.release = result[0]['SWReleaseCache'].replace('_', ',')
445
446 if 'phconfig' in result[0]:
447 trf.physics=deserialiseFromAMIString(result[0]['phconfig'])
448 else:
449 physics = {}
450 for k, v in result[0].items():
451 if 'Exec' in k:
452 execStrList = [execStr for execStr in convertToStr(v).replace('" "', '"" ""').split('" "')]
453 physics[convertToStr(k)] = [remove_enclosing_quotes(execStr).replace('\\"', '"') for execStr in execStrList]
454 elif '" "' in v:
455 msg.info('found a quoted space (" ") in parameter value for %s, converting to list', k)
456 subStrList = [subStr for subStr in convertToStr(v).replace('" "', '"" ""').split('" "')]
457 physics[convertToStr(k)] = [remove_enclosing_quotes(subStr).replace('\\"', '"') for subStr in subStrList]
458 else:
459 physics[convertToStr(k)] = convertToStr(remove_enclosing_quotes(v))
460
461 msg.debug('Result from AMI after string cleaning:')
462 msg.debug('%s', dumps(physics, indent = 4))
463
464 if suppressNonJobOptions:
465 for k in list(physics):
466 if k in ['inputs', 'outputs', 'productionStep', 'transformation', 'SWReleaseCache']:
467 physics.pop(k)
468
469 for k, v in physics.items():
470 if 'triggerConfig' in k or 'triggerConfigByRun' in k:
471 if ' ' in v:
472 physics[k] = v.replace(' ', ',')
473 msg.warning('Attempted to correct illegal trigger configuration string: {0} -> {1}'.format(v, physics[k]))
474
475 msg.debug("Checking for pseudo-argument internal to ProdSys...")
476 if 'extraParameter' in physics:
477 val = physics.pop('extraParameter')
478 msg.debug("Removed extraParamater=%s from arguments.", val)
479
480 msg.debug("Checking for input/output file arguments...")
481 for arg in list(physics):
482 if arg.lstrip('-').startswith('input') and arg.endswith('File'):
483 value = physics.pop(arg)
484 msg.debug("Found input file argument %s=%s.", arg, value)
485 trf.inFiles[arg] = value
486 elif arg.lstrip('-').startswith('output') and arg.endswith('File'):
487 value = physics.pop(arg)
488 msg.debug("Found output file argument %s=%s.", arg, value)
489 trf.outFiles[arg] = value
490
491 msg.debug("Checking for not set arguments...")
492 for arg, value in physics.items():
493 if value == "NONE" or value == "none" or value == ["NONE"]:
494 val = physics.pop(arg)
495 msg.debug("Removed %s=%s from arguments.", arg, val)
496
497 trf.physics = physics
498
499 if not isinstance(trf.physics, dict):
500 raise TransformAMIException(AMIerrorCode, "Bad result for tag's phconfig: {0}".format(trf.physics))
501
502 if trf.inFiles == {}:
503 if 'inputs' in result[0]:
504 trf.inFiles=deserialiseFromAMIString(result[0]['inputs'])
505 for inFileType, inFileName in trf.inFiles.items():
506 # Not all AMI tags actually have a working filename, so fallback to trfDefaultFiles
507 # if necessary
508 if inFileName == '' or inFileName =={} or inFileName == [] or inFileName == '{}':
509 trf.inFiles[inFileType] = getInputFileName(inFileType, tag)
510
511 if 'outputs' in result[0]:
512 outputs=deserialiseFromAMIString(result[0]['outputs'])
513 trf.outFiles=dict( (k, getOutputFileName(k.removeprefix('output').removesuffix('File')) ) for k in outputs )
514 trf.outfmts=[ outputs[k]['dstype'] for k in outputs ]
515 except KeyError as e:
516 raise TransformAMIException(AMIerrorCode, "Missing key in AMI data: {0}".format(e))
517 except Exception as e:
518 raise TransformAMIException(AMIerrorCode, "Got a very unexpected exception while parsing AMI outputs!"
519 " Please report.\nParsing:\n{0}\nRaised:\n{1}".format(result, e))
520
521 # Now fix up for command line in the case of a new transform:
522 if '_tf.py' in trf.name:
523 trf.newTransform=True
524 else:
525 trf.newTransform=False
526
527 return [ trf ]
528
529
530
534 try:
535 result = json.loads(amistring)
536 except ValueError as e_json:
537 msg.debug("Failed to decode {0} as JSON: {1}".format(amistring, e_json))
538 try:
539 result = ast.literal_eval(amistring)
540 except SyntaxError:
541 errMsg = "Failed to deserialise AMI string '{0}' using JSON or eval".format(amistring)
542 msg.error(errMsg)
543 raise TransformAMIException(AMIerrorCode, errMsg)
544 return result
Stores the information about a given tag.
Definition trfAMI.py:166
__init__(self, tag, suppressNonJobOptions=True)
Definition trfAMI.py:167
Stores the configuration of a transform.
Definition trfAMI.py:28
_argsToString(self, adict)
Definition trfAMI.py:69
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:310
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Module for transform exit codes.
Transform utility functions.
ReadablePANDA(s)
Un-escape information from PANDA.
Definition trfAMI.py:265
_parseIncludeDict(substep, value, joinWithChar=",")
Back convert a pre/postInclude dictionary into a set of command line compatible strings By default us...
Definition trfAMI.py:126
getProdSysTagsCharacters()
Get list of characters of ProdSys tags.
Definition trfAMI.py:236
getTrfConfigFromPANDA(tag)
Get information about a ProdSys tag from PANDA.
Definition trfAMI.py:272
getPANDAClient()
Get PANDA client.
Definition trfAMI.py:248
deserialiseFromAMIString(amistring)
Convert from a string to a python object.
Definition trfAMI.py:533
_parseExecDict(substep, value)
Back convert a pre/postExec dictionary into a set of command line compatible strings.
Definition trfAMI.py:117
getAMIClient(endpoints=['atlas-replica', 'atlas'])
Get an AMI client.
Definition trfAMI.py:222
getTrfConfigFromAMI(tag, suppressNonJobOptions=True)
Get information about a T0 tag from AMI.
Definition trfAMI.py:416
get_ami_tag(client, tag, suppressNonJobOptions=True)
Definition trfAMI.py:381
remove_enclosing_quotes(s)
Definition trfAMI.py:405
isNewAMITag(tag)
Definition trfAMI.py:131