14 from json
import dumps
17 msg = logging.getLogger(__name__)
19 from PyJobTransforms.trfExceptions
import TransformAMIException
20 from PyJobTransforms.trfDefaultFiles
import getInputFileName, getOutputFileName
24 AMIerrorCode=trfExit.nameToCode(
'TRF_AMI_ERROR')
44 for (k,v)
in theDict.items():
51 string +=
'\nInput file arguments:\n'
55 string +=
'\nExample input dataset: '+ self.
inDS +
'\n'
57 string +=
'\nOutput file arguments:\n'
61 string +=
'\nAMI outputs:\n'
66 string +=
'\nPossible output data types: '+
str(self.
outfmts) +
'\n'
71 for (k,v)
in adict.items():
73 if not k.startswith(
'--'):
78 if isinstance(v, dict):
82 for vk, vv
in v.items():
86 for vk, vv
in v.items():
91 if 'triggerConfig' in k:
96 for vk, vv
in v.items():
97 string +=
" " + vk + separator +
str(vv)
98 elif isinstance(v, (list, tuple)):
100 if "athenaopts" in k:
101 string +=
" " + k +
"=" +
"'" +
" ".
join(v).
replace(
"'",
"\\'") +
"'"
102 elif "Exec" in k
or "Include" in k:
104 string +=
" " + k +
" " +
" ".
join([
"'"+element.replace(
"'",
"\\'")+
"'" for element
in v])
106 string +=
" " + k +
"=" +
" ".
join([
"'" + element.replace(
"'",
"\\'") +
"'" for element
in v])
109 string +=
" "+k+
" "+
"'"+
str(v).
replace(
"'",
"\\'")+
"'"
111 string +=
" "+k+
"="+
"'"+
str(v).
replace(
"'",
"\\'")+
"'"
120 string +=
" '" + substep +
":" +
str(bit).
replace(
"'",
"\\'")+
"'"
128 string += substep +
":" + joinWithChar.join(value).
replace(
"'",
"\\'")+
"'"
156 if tag[0]
in newTagDict:
157 if int(tag[1:]) > newTagDict[tag[0]]:
158 msg.debug(
'it is a new tag')
161 msg.debug(
'it is NOT a new tag')
167 def __init__(self, tag, suppressNonJobOptions = True):
182 if self.
_tag[0]
in prodtags:
191 if self.
_trfs is None:
200 string =
'\nInformation about tag '+self.
_tag+
':\n'
203 string +=
'This is a ProdSys tag. Input and output file arguments are likely to be missing because they are often not part of the tag definition.\n'
205 string +=
'This is a T0 tag.\n'
207 string +=
'This tag consists of ' +
str(len(self.
trfs)) +
' transform command(s).\n'
208 string +=
'Transform commands follow below.\n'
209 string +=
'Input and output file names (if present) are only suggestions.\n'
211 for trf
in self.
trfs:
212 string+=
'\n'+
str(trf)+
'\n'
223 msg.debug(
'Getting AMI client...')
226 from pyAMI.client
import Client
228 raise TransformAMIException(AMIerrorCode,
'Import of pyAMI modules failed.')
230 msg.debug(
"Attempting to get AMI client for endpoints {0}".
format(endpoints))
231 amiclient =
Client(endpoints, ignore_proxy =
True)
239 msg.debug(
'Getting list of ProdSys tag characters...')
241 defaultList=[
'z',
'p',
'e',
's',
'd',
'r',
't',
'a',
'b',
'w']
249 msg.debug(
'Getting PANDA client...')
253 raise TransformAMIException(AMIerrorCode,
'Import of cx_Oracle failed (is Oracle setup on this machine?).')
256 cur = cx_Oracle.connect(
'atlas_grisli_r/panda_c10@adcr_panda').
cursor()
258 msg.debug(
'An exception occurred while connecting to PANDA database: %s', traceback.format_exc())
259 raise TransformAMIException(AMIerrorCode,
'Failed to get PANDA client connection (N.B. this does not work from outside CERN).')
274 msg.debug(
'Using PANDA to get info about tag %s', tag)
278 pandaclient.execute(
"select trf,trfv,lparams,vparams,formats,cache from t_trf_config where tag='%s' and cid=%d" %(tag[:1],
int(tag[1:]) ) )
279 result=pandaclient.fetchone()
281 msg.info(
'An exception occurred: %s', traceback.format_exc())
282 raise TransformAMIException(AMIerrorCode,
'Getting tag info from PANDA failed.')
285 raise TransformAMIException(AMIerrorCode,
'Tag %s not found in PANDA database' % tag)
287 msg.debug(
'Raw data returned from panda DB is:' + os.linesep +
str(result))
289 trfn=result[0].
split(
',')
290 msg.debug(
'List of transforms: %s', trfn)
291 trfv=result[1].
split(
',')
292 msg.debug(
'List of releases: %s', trfv)
293 lparams=result[2].
split(
';')
294 msg.debug(
'List of arguments: %s', lparams)
295 vparams=result[3].
split(
';')
296 msg.debug(
'List of argument values: %s', vparams)
297 formats=result[4].
split(
'.')
298 msg.debug(
'List of formats: %s', formats)
299 cache=result[5].
split(
',')
300 msg.debug(
'List of caches: %s', formats)
303 if not ( len(trfn) == len(trfv) == len(lparams) == len(vparams) ):
304 raise TransformAMIException(AMIerrorCode,
'Inconsistency in number of trfs.')
308 if len(cache) != len(trfv):
310 cache = cache * len(trfv)
312 raise TransformAMIException(AMIerrorCode,
'Inconsistency in number of caches entries vs. release numbers ({0}; {1}).'.
format(cache, trfv))
316 for iTrf
in range(len(trfn)):
320 if '_tf.py' in trf.name:
321 trf.newTransform=
True
323 trf.newTransform=
False
324 trf.release=trfv[iTrf] +
"," + cache[iTrf]
326 keys=lparams[iTrf].
split(
',')
327 values=vparams[iTrf].
split(
',')
329 if ( len(keys) != len(values) ):
330 raise TransformAMIException(AMIerrorCode,
'Inconsistency in number of arguments.')
332 physics = dict( (k,
ReadablePANDA(v) )
for (k,v)
in zip(keys, values))
334 for k, v
in physics.items():
335 if 'triggerConfig' in k
or 'triggerConfigByRun' in k:
337 physics[k] = v.replace(
' ',
',')
338 msg.warning(
'Attempted to correct illegal trigger configuration string: {0} -> {1}'.
format(v, physics[k]))
341 physics[k] = [ execElement.replace(
"%8C",
"")
for execElement
in v.split(
"%8C %8C") ]
343 msg.debug(
"Checking for pseudo-argument internal to ProdSys...")
344 if 'extraParameter' in physics:
345 val=physics.pop(
'extraParameter')
346 msg.debug(
"Removed extraParamater=%s from arguments.", val)
348 msg.debug(
"Checking for input/output file arguments...")
349 for arg
in list(physics):
350 if arg.lstrip(
'-').startswith(
'input')
and arg.endswith(
'File'):
351 value=physics.pop(arg)
352 msg.debug(
"Found input file argument %s=%s.", arg, value )
353 trf.inFiles[arg]=value
354 elif arg.lstrip(
'-').startswith(
'output')
and arg.endswith(
'File'):
355 value=physics.pop(arg)
356 msg.debug(
"Found output file argument %s=%s.", arg, value )
357 trf.outFiles[arg]=value
359 msg.debug(
"Checking for not set arguments...")
360 for arg,value
in physics.items():
361 if value==
"NONE" or value==
"none" or value==[
"NONE"]:
363 msg.debug(
"Removed %s=%s from arguments.", arg, val )
367 listOfTrfs.append(trf)
371 listOfTrfs[0].inDS=
None
372 listOfTrfs[-1].outfmts=formats
378 directly copied from pyAMI.atlas 5 API
379 should be removed once suppressNonJobOptions is in official release
382 '''Get AMI-tag information.
385 :client: the pyAMI client [ pyAMI.client.Client ]
386 :tag: the AMI-tag [ str ]
389 an array of python dictionnaries.
394 '-amiTag="%s"' % tag,
398 if suppressNonJobOptions:
399 command += [
'-suppressNonJobOptions']
403 return client.execute(command, format =
'dict_object').get_rows(
'amiTagInfo')
407 if s[0] == s[-1]
and s[0]
in (
'"',
"'"):
417 msg.debug(
'Using AMI to get info about tag %s', tag)
421 import pyAMI.exception
422 except ImportError
as e:
423 raise TransformAMIException(AMIerrorCode,
'Import of pyAMI modules failed ({0})'.
format(e))
428 result =
get_ami_tag(amiclient, tag, suppressNonJobOptions)
429 except pyAMI.exception.Error
as e:
430 msg.warning(
'An exception occured when connecting to primary AMI: {0}'.
format(e))
431 msg.debug(
'Exception: {0}'.
format(e))
432 if 'please login' in str(e)
or 'certificate expired' in str(e):
433 raise TransformAMIException(AMIerrorCode,
'Getting tag info from AMI failed with credential problem. '
434 'Please check your AMI account status.')
435 if 'Invalid amiTag' in str(e):
436 raise TransformAMIException(AMIerrorCode,
'Invalid AMI tag ({0}).'.
format(tag))
438 msg.debug(
"Error may not be fatal - will try AMI replica catalog")
442 trf.name = result[0][
'transformation']
443 trf.inputs=result[0].
get(
'inputs', {})
444 trf.outputs=result[0].
get(
'outputs', {})
445 trf.release = result[0][
'SWReleaseCache'].
replace(
'_',
',')
447 if 'phconfig' in result[0]:
451 for k, v
in result[0].
items():
456 msg.info(
'found a quoted space (" ") in parameter value for %s, converting to list', k)
462 msg.debug(
'Result from AMI after string cleaning:')
463 msg.debug(
'%s', dumps(physics, indent = 4))
465 if suppressNonJobOptions:
466 for k
in list(physics):
467 if k
in [
'inputs',
'outputs',
'productionStep',
'transformation',
'SWReleaseCache']:
470 for k, v
in physics.items():
471 if 'triggerConfig' in k
or 'triggerConfigByRun' in k:
473 physics[k] = v.replace(
' ',
',')
474 msg.warning(
'Attempted to correct illegal trigger configuration string: {0} -> {1}'.
format(v, physics[k]))
476 msg.debug(
"Checking for pseudo-argument internal to ProdSys...")
477 if 'extraParameter' in physics:
478 val = physics.pop(
'extraParameter')
479 msg.debug(
"Removed extraParamater=%s from arguments.", val)
481 msg.debug(
"Checking for input/output file arguments...")
482 for arg
in list(physics):
483 if arg.lstrip(
'-').startswith(
'input')
and arg.endswith(
'File'):
484 value = physics.pop(arg)
485 msg.debug(
"Found input file argument %s=%s.", arg, value)
486 trf.inFiles[arg] = value
487 elif arg.lstrip(
'-').startswith(
'output')
and arg.endswith(
'File'):
488 value = physics.pop(arg)
489 msg.debug(
"Found output file argument %s=%s.", arg, value)
490 trf.outFiles[arg] = value
492 msg.debug(
"Checking for not set arguments...")
493 for arg, value
in physics.items():
494 if value ==
"NONE" or value ==
"none" or value == [
"NONE"]:
495 val = physics.pop(arg)
496 msg.debug(
"Removed %s=%s from arguments.", arg, val)
498 trf.physics = physics
500 if not isinstance(trf.physics, dict):
501 raise TransformAMIException(AMIerrorCode,
"Bad result for tag's phconfig: {0}".
format(trf.physics))
503 if trf.inFiles == {}:
504 if 'inputs' in result[0]:
506 for inFileType, inFileName
in trf.inFiles.items():
509 if inFileName ==
'' or inFileName =={}
or inFileName == []
or inFileName ==
'{}':
512 if 'outputs' in result[0]:
514 trf.outFiles=dict( (k,
getOutputFileName(k.removeprefix(
'output').removesuffix(
'File')) )
for k
in outputs )
515 trf.outfmts=[ outputs[k][
'dstype']
for k
in outputs ]
516 except KeyError
as e:
517 raise TransformAMIException(AMIerrorCode,
"Missing key in AMI data: {0}".
format(e))
518 except Exception
as e:
519 raise TransformAMIException(AMIerrorCode,
"Got a very unexpected exception while parsing AMI outputs!"
520 " Please report.\nParsing:\n{0}\nRaised:\n{1}".
format(result, e))
523 if '_tf.py' in trf.name:
524 trf.newTransform=
True
526 trf.newTransform=
False
536 result = json.loads(amistring)
537 except ValueError
as e_json:
538 msg.debug(
"Failed to decode {0} as JSON: {1}".
format(amistring, e_json))
540 result = ast.literal_eval(amistring)
542 errMsg =
"Failed to deserialise AMI string '{0}' using JSON or eval".
format(amistring)
544 raise TransformAMIException(AMIerrorCode, errMsg)