ATLAS Offline Software
ELG_prun.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 
3 
4 import os.path
5 import subprocess
6 import re
7 import shlex
8 import time
9 
10 # Force flushing print
11 # Since this script is executed within a TPython::Exec() function call in the the PrunDriver class,
12 # if not forcing flushing then no printed messages in this script would be displayed to the user
13 # (unless an error is raised then the buffer would also be printed)
14 import functools
15 print = functools.partial(print, flush=True)
16 
17 
18 def ELG_prun(sample) :
19  # Important: only return as integer 1 if the creation of the tarball was unsuccesful as the PrunDriver
20  # relies on that to stop the submission if tarball creation was unsuccesful
21 
22  try:
23  from pandatools import PandaToolsPkgInfo # noqa: F401
24  except ImportError:
25  print ("prun needs additional setup, try:")
26  print (" lsetup panda")
27  return 99
28 
29  cmd = ["prun"]
30 
31  #These are options that can be set by the user
32  opts = ['destSE',
33  'site',
34  'rootVer',
35  'cmtConfig',
36  'excludedSite',
37  'nGBPerJob',
38  'memory',
39  'maxCpuCount',
40  'nFiles',
41  'nFilesPerJob',
42  'nEventsPerJob',
43  'nJobs',
44  'maxFileSize',
45  'maxNFilesPerJob',
46  'addNthFieldOfInDSToLFN',
47  'cpuTimePerEvent',
48  'maxWalltime',
49  'voms',
50  'workingGroup',
51  'tmpDir']
52 
53  #These are options that can be set by the user
54  switches = ['express',
55  'noSubmit',
56  'skipScout',
57  'disableAutoRetry',
58  'useNewCode',
59  'official',
60  'mergeOutput',
61  'useRootCore',
62  'useAthenaPackages',
63  'avoidVP']
64 
65  using_nEventsPerJob = False
66  from ROOT import SH
67  for opt in opts :
68  arg = sample.meta().castDouble('nc_' + opt, -1, SH.MetaObject.CAST_NOCAST_DEFAULT)
69  if abs(arg + 1) > 1e-6 :
70  cmd += ["--" + opt + "=" + str(int(round(arg)))]
71  if opt=="nEventsPerJob":
72  using_nEventsPerJob=True
73  else :
74  arg = sample.meta().castString('nc_' + opt)
75  if len(arg) :
76  cmd += ["--" + opt + "=" + arg]
77 
78  # nGBPerJob and nEventsPerJob are incompatible to prun
79  if using_nEventsPerJob:
80  cmd = [ x for x in cmd if "nGBPerJob" not in x ]
81  print(cmd)
82 
83  for switch in switches :
84  arg = sample.meta().castDouble('nc_' + switch, 0, SH.MetaObject.CAST_NOCAST_DEFAULT)
85  if arg != 0 :
86  cmd += ["--" + switch]
87  else :
88  arg = sample.meta().castString('nc_' + switch)
89  if len(arg) :
90  if arg != "False" and arg != "false" and arg != "FALSE" :
91  cmd += ["--" + switch]
92 
93  #These options should normally not be touched by the user
94  internalOpts = ['exec',
95  'inDS',
96  'outDS',
97  'outputs',
98  'writeInputToTxt',
99  'match',
100  'framework']
101 
102  for opt in internalOpts :
103  value = sample.meta().castString('nc_' + opt)
104  if opt == "exec" and using_nEventsPerJob:
105  value += " %SKIPEVENTS %MAXEVENTS"
106  cmd += ["--" + opt + "=" + value]
107 
108  if sample.meta().castDouble('nc_mergeOutput', 1, SH.MetaObject.CAST_NOCAST_DEFAULT) == 0 or sample.meta().castString('nc_mergeOutput').upper() == 'FALSE' :
109  #don't set merge script
110  pass
111  else :
112  cmd += ["--mergeScript=" + sample.meta().castString('nc_mergeScript')]
113 
114  if len(sample.meta().castString('nc_EventLoop_SubmitFlags')) :
115  cmd += shlex.split (sample.meta().castString('nc_EventLoop_SubmitFlags'))
116 
117  if sample.meta().castDouble('nc_showCmd', 0, SH.MetaObject.CAST_NOCAST_DEFAULT) != 0 :
118  print (cmd)
119 
120  # If tarball is not existing create it
121  # In case of tarball creation issue return 1
122  if not os.path.isfile('jobcontents.tgz') :
123  import copy
124  dummycmd = copy.deepcopy(cmd)
125  dummycmd += ["--outTarBall=jobcontents.tgz"]
126  if len(sample.meta().castString('nc_EventLoop_UserFiles')) :
127  dummycmd += ["--extFile=jobdef.root,runjob.sh," + sample.meta().castString('nc_EventLoop_UserFiles').replace(" ",",")]
128  pass
129  else :
130  dummycmd += ["--extFile=jobdef.root,runjob.sh"]
131  pass
132  dummycmd += ["--noSubmit"]
133 
134  try:
135  out = subprocess.check_output(dummycmd, stderr=subprocess.STDOUT, encoding="utf-8")
136  except subprocess.CalledProcessError as e:
137  # Handle a case where we couldn't get the grid nickname in advance
138  if 'Need to generate a grid proxy' in e.output and any( ['%nickname%' in x for x in cmd ] ):
139  print('Detected nickname still undefined. Trying to replace it.')
140  try:
141  from pandatools import PsubUtils
142  nickname = PsubUtils.getNickname()
143  dummycmd = [ x.replace('%nickname%',nickname) for x in dummycmd ]
144  cmd = [ x.replace('%nickname%',nickname) for x in cmd ]
145  except Exception as e_rep:
146  print(f'Nickname replacement failed with error {e_rep.returncode}: {e_rep.output}')
147  # Now try the job again
148  try:
149  out = subprocess.check_output(dummycmd, stderr=subprocess.STDOUT, encoding="utf-8")
150  except subprocess.CalledProcessError as e_take2:
151  # Failed to create tarball thus returning 1
152  print ("Command:")
153  print (e_take2.cmd)
154  print ("failed with return code " , e_take2.returncode)
155  print ("output was:")
156  print (e_take2.output)
157  return 1
158  except Exception as e:
159  # Catch any other exception
160  # Failed to create tarball thus returning 1
161  print ("Command:")
162  print (dummycmd)
163  print ("failed and output was:")
164  print (e)
165  return 1
166  else:
167  # Failed to create tarball thus returning 1
168  print ("Command:")
169  print (e.cmd)
170  print ("failed with return code " , e.returncode)
171  print ("output was:")
172  print (e.output)
173  return 1
174 
175  except Exception as e:
176  # Catch any other exception
177  # Failed to create tarball thus returning 1
178  print ("Command:")
179  print (dummycmd)
180  print ("failed and output was:")
181  print (e)
182  return 1
183 
184  cmd += ["--inTarBall=jobcontents.tgz"]
185 
186  # If user has not specified this flag it will return -1
187  nSubmitTries = int( sample.meta().castDouble("nc_prunNRetrySubmitToGrid", -1, SH.MetaObject.CAST_NOCAST_DEFAULT) )
188  # Make sure nSubmitTries is not lower than 1
189  # Could happen if user has specified a negative or 0 as value
190  # or if user has not set the nc_prunNRetrySubmitToGrid value
191  # In both cases assign the default value: 3 submission tries
192  if nSubmitTries < 1:
193  nSubmitTries = 3
194 
195  successSubmission = False
196  iTry = 0
197  out = ""
198 
199  listErrorsMessagesTries = []
200  while (iTry < nSubmitTries) and (not successSubmission):
201  if iTry > 0:
202  # Wait for 2 seconds as issue occured on the past try
203  # and it could be due to a transient issue
204  time.sleep(2)
205  try:
206  out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, encoding="utf-8")
207  # In that case submission was succesful
208  successSubmission = True
209  except subprocess.CalledProcessError as e:
210  # Failed to submit job
211  # Keep track of error messages
212  errorMsg = ""
213  errorMsg += "-"*60 + "\n"
214  errorMsg += f"iTry={iTry+1} out of nTries={nSubmitTries}\n"
215  errorMsg += "-"*60 + "\n"
216  errorMsg += "Command:\n"
217  errorMsg += f"{e.cmd}\n"
218  errorMsg += f"failed with return code {e.returncode}\n"
219  errorMsg += "output was:\n"
220  errorMsg += f"{e.output}\n"
221 
222  # Add error message to the list of error messages
223  listErrorsMessagesTries.append(errorMsg)
224  # Increase index tries
225  iTry += 1
226 
227  except Exception as e:
228  # Catch any other exception
229  # Failed to submit job
230  # Keep track of error messages
231  errorMsg = ""
232  errorMsg += "-"*60 + "\n"
233  errorMsg += f"iTry={iTry+1} out of nTries={nSubmitTries}\n"
234  errorMsg += "-"*60 + "\n"
235  errorMsg += "Command:\n"
236  errorMsg += f"{cmd}\n"
237  errorMsg += f"failed and output was:\n"
238  errorMsg += f"{e}"
239  # Add error message to the list of error messages
240  listErrorsMessagesTries.append(errorMsg)
241  # Increase index tries
242  iTry += 1
243 
244  # If after all tries submission was not succesful
245  # Then print error messages
246  if (not successSubmission):
247  # NB: only print error messages if the submission failed
248  # Otherwise silence the issue
249  print(f"Failed submission after nTries={nSubmitTries}")
250  print("\n".join(listErrorsMessagesTries))
251  return 2
252 
253  jediTaskID = 0
254  try:
255  line = re.findall(r'TaskID=\d+', str(out))[0]
256  jediTaskID = int(re.findall(r'\d+', line)[0])
257  except IndexError:
258  print (out)
259  return 3
260 
261  return jediTaskID
ELG_prun.ELG_prun
def ELG_prun(sample)
Definition: ELG_prun.py:18
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:310
MuonGM::round
float round(const float toRound, const unsigned int decimals)
Definition: Mdt.cxx:27
upper
int upper(int c)
Definition: LArBadChannelParser.cxx:49
ELG_prun.print
print
Definition: ELG_prun.py:15
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
str
Definition: BTagTrackIpAccessor.cxx:11