ATLAS Offline Software
CreateTierZeroArgdict.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
4 #
5 # WriteTierZeroArgdict.py
6 #
7 # Utility to create json argument dictionary for a given AMI tag, filename and datasetname
8 #
9 # To see script usage, type:
10 # python CreateTierZeroArgdict.py -h
11 #
12 # Examples of usage: python CreateTierZeroArgdict.py f622 data15_13TeV.00284473.physics_Main.daq.RAW data15_13TeV.00284473.physics_Main.daq.RAW._lb0267._SFO-3._0001.data
13 # python CreateTierZeroArgdict.py --maxEvents 20 --jobnr 1 --ncores 4 --NoMergeTypeList AOD,ESD,DRAW_EGZ,DRAW_EMU,DRAW_TAUMUH,DRAW_ZMUMU f622 data15_13TeV.00284473.physics_Main.daq.RAW data15_13TeV.00284473.physics_Main.daq.RAW._lb0267._SFO-3._0001.data
14 
15 
16 import sys, json, traceback, re, argparse
17 
18 
21 parser = argparse.ArgumentParser(add_help=True,usage='\n\nThis utility creates json argument dictionary and prints instructions for running a Tier-0 job with it. \
22  \nThis script shall be used only with reconstruction related amitags (x/f/c flavours). \
23  \nAMI tag, dataset and file names are required input arguments. \
24  \n\
25  \n1. source AMI from the command line:\
26  \n source /afs/cern.ch/atlas/software/tools/pyAMI/setup.sh \
27  \n2. set up an encrypted configuration file to be able to connect to AMI with your AMI username and password, type:\
28  \n ami auth\
29  \n3. run the script: \
30  \n CreateTierZeroArgdict.py [-h] [--maxEvents MAXEVENTS] [--ncores NCORES] \
31  \n [--NoMergeTypeList NOMERGETYPELIST] \
32  \n amitag dsname inputflname')
33 #parser = argparse.ArgumentParser(add_help=True)
34 parser.add_argument("amitag", type=str, help="specify amitag")
35 parser.add_argument("dsname", type=str, help="specify datasetname")
36 parser.add_argument("inputflname", type=str, help="specify filename")
37 
38 parser.add_argument("--maxEvents",
39  dest='maxEvents', type=int, default=20,
40  help = 'specify maximum number of events to process (default: %d)' % 20)
41 
42 parser.add_argument('--ncores',
43  dest='ncores', type=int, default=1,
44  help='specify number of cores for athemaMP job (default %s)' % 1)
45 
46 parser.add_argument('--NoMergeTypeList',
47  dest='NoMergeTypeList', type=str, default='AOD,ESD,DRAW_EGZ,DRAW_EMU,DRAW_TAUMUH,DRAW_ZMUMU',
48  help='specify comma separated list of output types which you do not want to be merged on the node when running in multicore mode (default: %s)' % 'AOD,ESD,DRAW_EGZ,DRAW_EMU,DRAW_TAUMUH,DRAW_ZMUMU')
49 
50 args = parser.parse_args()
51 
52 
53 import pyAMI.client
54 
55 #print ("args: ",args)
56 
57 # positional arguments
58 amitag= args.amitag
59 dsname=args.dsname
60 inputflname=args.inputflname
61 #
62 # default values of optiona arguments
63 maxEvents=args.maxEvents
64 ncores=args.ncores
65 NoMergeTypeList=args.NoMergeTypeList
66 
67 taskstep= 'recon'
68 
69 
72 
73 def resolve(tag) :
74  try :
75  amiuser = ''
76  amipwd = ''
77  amiendpoint = 'atlas'
78  ami = pyAMI.client.Client(endpoint = amiendpoint)
79 
80  command = ['GetAMITagInfo','-amiTag=%s' % tag,]
81 
82  if amiuser and amipwd :
83  amires = ami.execute(command, format = 'dict_object', AMIUser = amiuser, AMIPass = amipwd)
84  else :
85  amires = ami.execute(command, format = 'dict_object')
86 
87  # extract the result
88  r = amires.get_rows('amiTagInfo')[0] # assuming simple tag, i.e. not chained
89  # TODO get rid of these empty strings/str/eval at some point
90  res = {'outputs' : eval(r.get('outputs','{}')),
91  'inputs' : eval(r.get('inputs','{}')),
92  'phconfig' : eval(r.get('phconfig','{}')),
93  'moreInfo' : eval(r.get('moreInfo','{}')),
94  'transformation' : str(r.get('transformation','')),
95  'trfsetupcmd' : str(r.get('trfsetupcmd','')),
96  }
97  except Exception:
98  traceback.print_exc()
99  res = 'error'
100  return res
101 
102 
103 def evalDatasetNameFion(dsname,dsfionstr):
104  # returns derived dsn or None
105  # raises 'error' in case of problems
106  tokens = dsfionstr.split()
107  if tokens[0] == 'lambda' :
108  dsfion=eval(dsfionstr)
109  newdsname=dsfion(dsname)
110  elif tokens[0] == 'replace' :
111  newdsname = dsname
112  pairs = tokens[1:]
113  while len(pairs) > 1 :
114  newdsname = newdsname.replace(pairs[0],pairs[1])
115  pairs = pairs[2:]
116  elif tokens[0] == 'replace_nth' :
117  parts = dsname.split('.')
118  pairs = tokens[1:]
119  while len(pairs) > 1 :
120  pos = int(pairs[0]) # pos starts counting from 1 !
121  new = pairs[1]
122  parts[pos-1] = new
123  pairs = pairs[2:]
124  newdsname = '.'.join(parts)
125  else :
126  print("ERROR >>> unexpected dsnamefion syntax")
127  raise Exception('error')
128  return newdsname
129 
130 
131 # Executed from the command line
132 if __name__ == '__main__':
133 
134  # extracting information about input file name partition ID for naming of the outputs/logfile/argdict
135  partID='._0001'
136  try:
137  partIDlist = inputflname.split('._')[1:]
138  partIDlist[-1]=partIDlist[-1].split('.')[0]
139  if len(partIDlist)!=0 :
140  if partIDlist[0]:
141  partID='._'+'._'.join(partIDlist)
142  else:
143  pass
144  else:
145  print("WARNING >>> input file name does not have the expected format - no partition identifiers separated by '._' found in the inputfilename lbXXXX._SFO-X._XXXX, ._lb0000._SFO-0._0000.job,<jobnr>.log will be used instead for the log file name")
146  pass
147  except Exception:
148  pass
149 
150  logfilename=dsname+"."+ amitag+"."+taskstep+partID+".job.log"
151  jsonflname=dsname+"."+ amitag+"."+taskstep+partID+".job.argdict.json"
152  jobargdict={}
153 
154  # resolve AMI tag
155  ami_tag_content = resolve(amitag)
156 
157  # translating AMI tag info into job argument dictionary
158 
159  # always adding/rewriting maxEvents field with the number of events passed as parameter to this script
160  phconfig=ami_tag_content.get('phconfig',{})
161  phconfig['maxEvents']=maxEvents
162 
163  # disabling merging of outputs on the node when running athenaMP
164  # must be specified as a comma separated list of types (parameter to this script)
165  if ncores>1:
166  for mergetype in NoMergeTypeList.split(","):
167  if "athenaMPMergeTargetSize" in phconfig:
168  phconfig["athenaMPMergeTargetSize"].update({mergetype:0.0})
169  else:
170  phconfig["athenaMPMergeTargetSize"]={mergetype:0.0}
171 
172  # adding multicore processing instruction in case ncores > 1
173  if ncores > 1 :
174  parncores='--nprocs=%i' % ncores
175  phconfig['athenaopts']=[parncores]
176 
177  jobargdict.update(phconfig)
178 
179  #checking if AMI tag contains empty 'inputBSFile' dictionary
180  if 'inputBSFile' in ami_tag_content.get('inputs',{}):
181  # if yes - writing the inputfile into the argument dictionary according to the Tier-0 nomenclature
182  if not ami_tag_content.get('inputs',{})['inputBSFile'] :
183  jobargdict['inputBSFile']=[dsname+'#'+inputflname]
184  else :
185  print ("ERROR - not recognized inputs in AMI tag; script works only for the following AMI tag configuration of the inputs \n 'inputs': {'inputBSFile': {}}")
186  sys.exit(1)
187 
188  # starting translating the output dataset names form the ifMatch expressions specified in the AMI tag
189  outputs= ami_tag_content.get('outputs',{})
190  dstype= dsname.split('.')[-1]
191  outdatasets={}
192 
193  for k,v in outputs.items() :
194  parname = k #v.get('parname')
195  dstp = v.get('dstype')
196  # handle meta cases
197  if dstp == '!likeinput' :
198  dstp = dstype
199  elif dstp.startswith('!replace') : # allow multiple, pair-wise replace
200  pcs = dstp.split()
201  pairs = pcs[1:]
202  dstp = dstype
203  while len(pairs) > 1 :
204  dstp = dstp.replace(pairs[0],pairs[1])
205  pairs = pairs[2:]
206 
207  ifmatch = v.get('ifMatch',None)
208  # skip this output if there is an ifMatch and it does not match
209  if ifmatch :
210  m = re.match(ifmatch,dsname)
211  if not m : continue
212 
213  dsfionstr = v.get('dsnamefion',None)
214  # interpret dsfion string
215  if dsfionstr :
216  outdsname = evalDatasetNameFion(dsname,dsfionstr)
217  else :
218  # apply default nomenclature
219  pcs=dsname.split('.')
220  if len(pcs)<5 :
221  print ("Dataset name must have at least 5 parts separated by comma to comply with Tier-0 naming convention: <project>.<runnr>.<stream>.<granularity>.<type>")
222  sys.exit(1)
223  pcs[3]=taskstep
224  pcs[4]=dstp
225  pcs.append(str(amitag))
226  outdsname='.'.join(pcs)
227 
228  #outdsname = outdsname+tasktag
229  #outdatasets[parname] = [outdsname,dstp]
230  outdatasets[parname] = outdsname
231 
232  # assembling the dictionary of outputs
233  for parname in outdatasets.keys() :
234  if not parname.startswith('_') :
235  outdsname=outdatasets[parname]
236  outfilename=outdsname+partID
237  outfilevalue=outdsname+'#'+outfilename
238  jobargdict[parname]=outfilevalue
239 
240  jobargdictfile = open(jsonflname,"w")
241  json.dump(jobargdict, jobargdictfile,sort_keys=True, indent=4, separators=(',', ': '))
242  jobargdictfile.close()
243 
244 
245  print ("####################################################################" )
246  print ("#\n# To run a local job:")
247  print ("#\n# 1. Copy the following files into a directory from which you intend to run your job:\n# %s\n# %s " %(jsonflname,inputflname))
248  print ("#\n# 2. Setup the environment: \n# source "+ ami_tag_content.get('trfsetupcmd',{}) )
249  print ("#\n# 3. Run the job: \n# python -u `which "+ami_tag_content.get('transformation',{})+"` --argJSON="+jsonflname+" &> "+logfilename )
250  print ("#")
251  print ("####################################################################" )
252  print ("#\n# Note aside: to copy inputfiles from EOS (if still present there) one can usually use commands like:")
253  print ("# xrdcp -f --nopbar root://eosatlas.cern.ch//eos/atlas/atlastier0/rucio/%s/%s/%s/%s/%s %s" % (dsname.split(".")[0],dsname.split(".")[2],dsname.split(".")[1],dsname,inputflname,inputflname))
254  print ("#\n# OR from castor disk - assuming the file is STAGED:")
255  print ("# export STAGE_SVCCLASS=t0atlas" )
256  print ("# rfcp /castor/cern.ch/grid/atlas/rucio/raw/%s/%s/%s/%s/%s ." % (dsname.split(".")[0],dsname.split(".")[2],dsname.split(".")[1],dsname,inputflname))
257  print ("#")
258  print ("####################################################################" )
259 
python.CreateTierZeroArgdict.evalDatasetNameFion
def evalDatasetNameFion(dsname, dsfionstr)
Definition: CreateTierZeroArgdict.py:103
python.CreateTierZeroArgdict.resolve
def resolve(tag)
utility functions
Definition: CreateTierZeroArgdict.py:73
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
Trk::open
@ open
Definition: BinningType.h:40
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
python.CreateTierZeroArgdict.int
int
Definition: CreateTierZeroArgdict.py:39
WriteBchToCool.update
update
Definition: WriteBchToCool.py:67
Trk::split
@ split
Definition: LayerMaterialProperties.h:38