16 import sys, json, traceback, re, argparse
21 parser = argparse.ArgumentParser(add_help=
True,usage=
'\n\nThis utility creates json argument dictionary and prints instructions for running a Tier-0 job with it. \
22 \nThis script shall be used only with reconstruction related amitags (x/f/c flavours). \
23 \nAMI tag, dataset and file names are required input arguments. \
25 \n1. source AMI from the command line:\
26 \n source /afs/cern.ch/atlas/software/tools/pyAMI/setup.sh \
27 \n2. set up an encrypted configuration file to be able to connect to AMI with your AMI username and password, type:\
29 \n3. run the script: \
30 \n CreateTierZeroArgdict.py [-h] [--maxEvents MAXEVENTS] [--ncores NCORES] \
31 \n [--NoMergeTypeList NOMERGETYPELIST] \
32 \n amitag dsname inputflname')
34 parser.add_argument(
"amitag", type=str, help=
"specify amitag")
35 parser.add_argument(
"dsname", type=str, help=
"specify datasetname")
36 parser.add_argument(
"inputflname", type=str, help=
"specify filename")
38 parser.add_argument(
"--maxEvents",
39 dest=
'maxEvents', type=int, default=20,
40 help =
'specify maximum number of events to process (default: %d)' % 20)
42 parser.add_argument(
'--ncores',
43 dest=
'ncores', type=int, default=1,
44 help=
'specify number of cores for athemaMP job (default %s)' % 1)
46 parser.add_argument(
'--NoMergeTypeList',
47 dest=
'NoMergeTypeList', type=str, default=
'AOD,ESD,DRAW_EGZ,DRAW_EMU,DRAW_TAUMUH,DRAW_ZMUMU',
48 help=
'specify comma separated list of output types which you do not want to be merged on the node when running in multicore mode (default: %s)' %
'AOD,ESD,DRAW_EGZ,DRAW_EMU,DRAW_TAUMUH,DRAW_ZMUMU')
50 args = parser.parse_args()
60 inputflname=args.inputflname
63 maxEvents=args.maxEvents
65 NoMergeTypeList=args.NoMergeTypeList
78 ami = pyAMI.client.Client(endpoint = amiendpoint)
80 command = [
'GetAMITagInfo',
'-amiTag=%s' % tag,]
82 if amiuser
and amipwd :
83 amires = ami.execute(command, format =
'dict_object', AMIUser = amiuser, AMIPass = amipwd)
85 amires = ami.execute(command, format =
'dict_object')
88 r = amires.get_rows(
'amiTagInfo')[0]
90 res = {
'outputs' : eval(r.get(
'outputs',
'{}')),
91 'inputs' : eval(r.get(
'inputs',
'{}')),
92 'phconfig' : eval(r.get(
'phconfig',
'{}')),
93 'moreInfo' : eval(r.get(
'moreInfo',
'{}')),
94 'transformation' :
str(r.get(
'transformation',
'')),
95 'trfsetupcmd' :
str(r.get(
'trfsetupcmd',
'')),
106 tokens = dsfionstr.split()
107 if tokens[0] ==
'lambda' :
108 dsfion=eval(dsfionstr)
109 newdsname=dsfion(dsname)
110 elif tokens[0] ==
'replace' :
113 while len(pairs) > 1 :
114 newdsname = newdsname.replace(pairs[0],pairs[1])
116 elif tokens[0] ==
'replace_nth' :
117 parts = dsname.split(
'.')
119 while len(pairs) > 1 :
124 newdsname =
'.'.
join(parts)
126 print(
"ERROR >>> unexpected dsnamefion syntax")
127 raise Exception(
'error')
132 if __name__ ==
'__main__':
137 partIDlist = inputflname.split(
'._')[1:]
138 partIDlist[-1]=partIDlist[-1].
split(
'.')[0]
139 if len(partIDlist)!=0 :
141 partID=
'._'+
'._'.
join(partIDlist)
145 print(
"WARNING >>> input file name does not have the expected format - no partition identifiers separated by '._' found in the inputfilename lbXXXX._SFO-X._XXXX, ._lb0000._SFO-0._0000.job,<jobnr>.log will be used instead for the log file name")
150 logfilename=dsname+
"."+ amitag+
"."+taskstep+partID+
".job.log"
151 jsonflname=dsname+
"."+ amitag+
"."+taskstep+partID+
".job.argdict.json"
160 phconfig=ami_tag_content.get(
'phconfig',{})
161 phconfig[
'maxEvents']=maxEvents
166 for mergetype
in NoMergeTypeList.split(
","):
167 if "athenaMPMergeTargetSize" in phconfig:
168 phconfig[
"athenaMPMergeTargetSize"].
update({mergetype:0.0})
170 phconfig[
"athenaMPMergeTargetSize"]={mergetype:0.0}
174 parncores=
'--nprocs=%i' % ncores
175 phconfig[
'athenaopts']=[parncores]
177 jobargdict.update(phconfig)
180 if 'inputBSFile' in ami_tag_content.get(
'inputs',{}):
182 if not ami_tag_content.get(
'inputs',{})[
'inputBSFile'] :
183 jobargdict[
'inputBSFile']=[dsname+
'#'+inputflname]
185 print (
"ERROR - not recognized inputs in AMI tag; script works only for the following AMI tag configuration of the inputs \n 'inputs': {'inputBSFile': {}}")
189 outputs= ami_tag_content.get(
'outputs',{})
190 dstype= dsname.split(
'.')[-1]
193 for k,v
in outputs.items() :
195 dstp = v.get(
'dstype')
197 if dstp ==
'!likeinput' :
199 elif dstp.startswith(
'!replace') :
203 while len(pairs) > 1 :
204 dstp = dstp.replace(pairs[0],pairs[1])
207 ifmatch = v.get(
'ifMatch',
None)
210 m = re.match(ifmatch,dsname)
213 dsfionstr = v.get(
'dsnamefion',
None)
219 pcs=dsname.split(
'.')
221 print (
"Dataset name must have at least 5 parts separated by comma to comply with Tier-0 naming convention: <project>.<runnr>.<stream>.<granularity>.<type>")
225 pcs.append(
str(amitag))
226 outdsname=
'.'.
join(pcs)
230 outdatasets[parname] = outdsname
233 for parname
in outdatasets.keys() :
234 if not parname.startswith(
'_') :
235 outdsname=outdatasets[parname]
236 outfilename=outdsname+partID
237 outfilevalue=outdsname+
'#'+outfilename
238 jobargdict[parname]=outfilevalue
240 jobargdictfile =
open(jsonflname,
"w")
241 json.dump(jobargdict, jobargdictfile,sort_keys=
True, indent=4, separators=(
',',
': '))
242 jobargdictfile.close()
245 print (
"####################################################################" )
246 print (
"#\n# To run a local job:")
247 print (
"#\n# 1. Copy the following files into a directory from which you intend to run your job:\n# %s\n# %s " %(jsonflname,inputflname))
248 print (
"#\n# 2. Setup the environment: \n# source "+ ami_tag_content.get(
'trfsetupcmd',{}) )
249 print (
"#\n# 3. Run the job: \n# python -u `which "+ami_tag_content.get(
'transformation',{})+
"` --argJSON="+jsonflname+
" &> "+logfilename )
251 print (
"####################################################################" )
252 print (
"#\n# Note aside: to copy inputfiles from EOS (if still present there) one can usually use commands like:")
253 print (
"# xrdcp -f --nopbar root://eosatlas.cern.ch//eos/atlas/atlastier0/rucio/%s/%s/%s/%s/%s %s" % (dsname.split(
".")[0],dsname.split(
".")[2],dsname.split(
".")[1],dsname,inputflname,inputflname))
254 print (
"#\n# OR from castor disk - assuming the file is STAGED:")
255 print (
"# export STAGE_SVCCLASS=t0atlas" )
256 print (
"# rfcp /castor/cern.ch/grid/atlas/rucio/raw/%s/%s/%s/%s/%s ." % (dsname.split(
".")[0],dsname.split(
".")[2],dsname.split(
".")[1],dsname,inputflname))
258 print (
"####################################################################" )