ATLAS Offline Software
Loading...
Searching...
No Matches
ELG_prun.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
3
4import os.path
5import subprocess
6import re
7import shlex
8import time
9
10# Force flushing print
11# Since this script is executed within a TPython::Exec() function call in the the PrunDriver class,
12# if not forcing flushing then no printed messages in this script would be displayed to the user
13# (unless an error is raised then the buffer would also be printed)
14import functools
15print = functools.partial(print, flush=True)
16
17
18def ELG_prun(sample) :
19 # Important: only return as integer 1 if the creation of the tarball was unsuccesful as the PrunDriver
20 # relies on that to stop the submission if tarball creation was unsuccesful
21
22 try:
23 from pandatools import PandaToolsPkgInfo # noqa: F401
24 except ImportError:
25 print ("prun needs additional setup, try:")
26 print (" lsetup panda")
27 return 99
28
29 cmd = ["prun"]
30
31 #These are options that can be set by the user
32 opts = ['destSE',
33 'site',
34 'rootVer',
35 'cmtConfig',
36 'excludedSite',
37 'nGBPerJob',
38 'memory',
39 'maxCpuCount',
40 'nFiles',
41 'nFilesPerJob',
42 'nEventsPerJob',
43 'nJobs',
44 'maxFileSize',
45 'maxNFilesPerJob',
46 'addNthFieldOfInDSToLFN',
47 'cpuTimePerEvent',
48 'maxWalltime',
49 'voms',
50 'workingGroup',
51 'tmpDir']
52
53 #These are options that can be set by the user
54 switches = ['express',
55 'noSubmit',
56 'skipScout',
57 'disableAutoRetry',
58 'useNewCode',
59 'official',
60 'mergeOutput',
61 'useRootCore',
62 'useAthenaPackages',
63 'avoidVP']
64
65 using_nEventsPerJob = False
66 from ROOT import SH
67 for opt in opts :
68 arg = sample.meta().castDouble('nc_' + opt, -1, SH.MetaObject.CAST_NOCAST_DEFAULT)
69 if abs(arg + 1) > 1e-6 :
70 cmd += ["--" + opt + "=" + str(int(round(arg)))]
71 if opt=="nEventsPerJob":
72 using_nEventsPerJob=True
73 else :
74 arg = sample.meta().castString('nc_' + opt)
75 if len(arg) :
76 cmd += ["--" + opt + "=" + arg]
77
78 # nGBPerJob and nEventsPerJob are incompatible to prun
79 if using_nEventsPerJob:
80 cmd = [ x for x in cmd if "nGBPerJob" not in x ]
81 print(cmd)
82
83 for switch in switches :
84 arg = sample.meta().castDouble('nc_' + switch, 0, SH.MetaObject.CAST_NOCAST_DEFAULT)
85 if arg != 0 :
86 cmd += ["--" + switch]
87 else :
88 arg = sample.meta().castString('nc_' + switch)
89 if len(arg) :
90 if arg != "False" and arg != "false" and arg != "FALSE" :
91 cmd += ["--" + switch]
92
93 #These options should normally not be touched by the user
94 internalOpts = ['exec',
95 'inDS',
96 'outDS',
97 'outputs',
98 'writeInputToTxt',
99 'match',
100 'framework']
101
102 for opt in internalOpts :
103 value = sample.meta().castString('nc_' + opt)
104 if opt == "exec" and using_nEventsPerJob:
105 value += " %SKIPEVENTS %MAXEVENTS"
106 cmd += ["--" + opt + "=" + value]
107
108 if sample.meta().castDouble('nc_mergeOutput', 1, SH.MetaObject.CAST_NOCAST_DEFAULT) == 0 or sample.meta().castString('nc_mergeOutput').upper() == 'FALSE' :
109 #don't set merge script
110 pass
111 else :
112 cmd += ["--mergeScript=" + sample.meta().castString('nc_mergeScript')]
113
114 if len(sample.meta().castString('nc_EventLoop_SubmitFlags')) :
115 cmd += shlex.split (sample.meta().castString('nc_EventLoop_SubmitFlags'))
116
117 if sample.meta().castDouble('nc_showCmd', 0, SH.MetaObject.CAST_NOCAST_DEFAULT) != 0 :
118 print (cmd)
119
120 # If tarball is not existing create it
121 # In case of tarball creation issue return 1
122 if not os.path.isfile('jobcontents.tgz') :
123 import copy
124 dummycmd = copy.deepcopy(cmd)
125 dummycmd += ["--outTarBall=jobcontents.tgz"]
126 if len(sample.meta().castString('nc_EventLoop_UserFiles')) :
127 dummycmd += ["--extFile=jobdef.root,runjob.sh," + sample.meta().castString('nc_EventLoop_UserFiles').replace(" ",",")]
128 pass
129 else :
130 dummycmd += ["--extFile=jobdef.root,runjob.sh"]
131 pass
132 dummycmd += ["--noSubmit"]
133
134 try:
135 out = subprocess.check_output(dummycmd, stderr=subprocess.STDOUT, encoding="utf-8")
136 except subprocess.CalledProcessError as e:
137 # Handle a case where we couldn't get the grid nickname in advance
138 if 'Need to generate a grid proxy' in e.output and any( ['%nickname%' in x for x in cmd ] ):
139 print('Detected nickname still undefined. Trying to replace it.')
140 try:
141 from pandatools import PsubUtils
142 nickname = PsubUtils.getNickname()
143 dummycmd = [ x.replace('%nickname%',nickname) for x in dummycmd ]
144 cmd = [ x.replace('%nickname%',nickname) for x in cmd ]
145 except Exception as e_rep:
146 print(f'Nickname replacement failed with error {e_rep.returncode}: {e_rep.output}')
147 # Now try the job again
148 try:
149 out = subprocess.check_output(dummycmd, stderr=subprocess.STDOUT, encoding="utf-8")
150 except subprocess.CalledProcessError as e_take2:
151 # Failed to create tarball thus returning 1
152 print ("Command:")
153 print (e_take2.cmd)
154 print ("failed with return code " , e_take2.returncode)
155 print ("output was:")
156 print (e_take2.output)
157 return 1
158 except Exception as e:
159 # Catch any other exception
160 # Failed to create tarball thus returning 1
161 print ("Command:")
162 print (dummycmd)
163 print ("failed and output was:")
164 print (e)
165 return 1
166 else:
167 # Failed to create tarball thus returning 1
168 print ("Command:")
169 print (e.cmd)
170 print ("failed with return code " , e.returncode)
171 print ("output was:")
172 print (e.output)
173 return 1
174
175 except Exception as e:
176 # Catch any other exception
177 # Failed to create tarball thus returning 1
178 print ("Command:")
179 print (dummycmd)
180 print ("failed and output was:")
181 print (e)
182 return 1
183
184 cmd += ["--inTarBall=jobcontents.tgz"]
185
186 # If user has not specified this flag it will return -1
187 nSubmitTries = int( sample.meta().castDouble("nc_prunNRetrySubmitToGrid", -1, SH.MetaObject.CAST_NOCAST_DEFAULT) )
188 # Make sure nSubmitTries is not lower than 1
189 # Could happen if user has specified a negative or 0 as value
190 # or if user has not set the nc_prunNRetrySubmitToGrid value
191 # In both cases assign the default value: 3 submission tries
192 if nSubmitTries < 1:
193 nSubmitTries = 3
194
195 successSubmission = False
196 iTry = 0
197 out = ""
198
199 listErrorsMessagesTries = []
200 while (iTry < nSubmitTries) and (not successSubmission):
201 if iTry > 0:
202 # Wait for 2 seconds as issue occured on the past try
203 # and it could be due to a transient issue
204 time.sleep(2)
205 try:
206 out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, encoding="utf-8")
207 # In that case submission was succesful
208 successSubmission = True
209 except subprocess.CalledProcessError as e:
210 # Failed to submit job
211 # Keep track of error messages
212 errorMsg = ""
213 errorMsg += "-"*60 + "\n"
214 errorMsg += f"iTry={iTry+1} out of nTries={nSubmitTries}\n"
215 errorMsg += "-"*60 + "\n"
216 errorMsg += "Command:\n"
217 errorMsg += f"{e.cmd}\n"
218 errorMsg += f"failed with return code {e.returncode}\n"
219 errorMsg += "output was:\n"
220 errorMsg += f"{e.output}\n"
221
222 # Add error message to the list of error messages
223 listErrorsMessagesTries.append(errorMsg)
224 # Increase index tries
225 iTry += 1
226
227 except Exception as e:
228 # Catch any other exception
229 # Failed to submit job
230 # Keep track of error messages
231 errorMsg = ""
232 errorMsg += "-"*60 + "\n"
233 errorMsg += f"iTry={iTry+1} out of nTries={nSubmitTries}\n"
234 errorMsg += "-"*60 + "\n"
235 errorMsg += "Command:\n"
236 errorMsg += f"{cmd}\n"
237 errorMsg += f"failed and output was:\n"
238 errorMsg += f"{e}"
239 # Add error message to the list of error messages
240 listErrorsMessagesTries.append(errorMsg)
241 # Increase index tries
242 iTry += 1
243
244 # If after all tries submission was not succesful
245 # Then print error messages
246 if (not successSubmission):
247 # NB: only print error messages if the submission failed
248 # Otherwise silence the issue
249 print(f"Failed submission after nTries={nSubmitTries}")
250 print("\n".join(listErrorsMessagesTries))
251 return 2
252
253 jediTaskID = 0
254 try:
255 line = re.findall(r'TaskID=\d+', str(out))[0]
256 jediTaskID = int(re.findall(r'\d+', line)[0])
257 except IndexError:
258 print (out)
259 return 3
260
261 return jediTaskID
int upper(int c)
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:310