ATLAS Offline Software
Loading...
Searching...
No Matches
python.sendEI_SPB_Lib Namespace Reference

Classes

class  MSG
class  MyListener

Functions

 osCopy (logger, opt, info)
 eosCopy (logger, opt, info)
 endpointV (endpoint)
 options (argv)
 eimrun (logger, opt)

Function Documentation

◆ eimrun()

python.sendEI_SPB_Lib.eimrun ( logger,
opt )

Definition at line 365 of file sendEI_SPB_Lib.py.

365def eimrun(logger, opt):
366
367 # logger
368 global log
369 log = logger
370
371 # open EI file and get info
372 fname = opt.eifile
373 if not (os.path.isfile(fname) and os.access(fname, os.R_OK)):
374 log.info("Event Index SPB file {} does not "
375 "exists or is not readble".format(fname))
376 sys.exit(0)
377
378 try:
379 spbf = SpbFile(fname)
380 info = spbf.getInfo()
381 except Exception:
382 log.info("Unable to get info from EI SPB file {}".format(fname))
383 sys.exit(0)
384
385 # remove keys not to be included
386 if 'provenanceRef' in info:
387 del info['provenanceRef']
388 if 'triggerInfo' in info:
389 del info['triggerInfo']
390
391 # prepare to copy
392 if opt.objID is None:
393 objID = uuid.uuid4().hex
394 else:
395 objID = opt.objID
396 info['uuid'] = objID
397
398 urlx = None
399 try:
400 log.debug("call osCopy to transfer file")
401 urlx = osCopy(logger, opt, info)
402 except Exception as e:
403 log.info("Error trying to copy file to ObjectStore: "+str(e))
404 if opt.eosfallback:
405 log.info("osCopy failed. Trying with eosCopy")
406 urlx = eosCopy(logger, opt, info)
407
408 if urlx is None:
409 log.info("Unable to send SPB file")
410 sys.exit(0)
411
412 # attach timestamp and file url into file info.
413 # it will be sent to the broker messaging
414 ts = int(time.time() * 1000)
415 info['timestamp'] = ts
416 info['url'] = urlx
417
418 mbroker = MSG(opt)
419 try:
420 mbroker.connect()
421 except stomp.exception.ConnectFailedException:
422 log.info("Unable to connect to stomp broker")
423 sys.exit(0)
424
425 msg = {}
426 msg['type'] = 1 # PROD -> SUP
427 msg['data'] = info # payload
428 msg['ts'] = ts # timestamp
429 msg['id'] = objID # instance id
430 mbroker.sendMSG(json.dumps(msg))
431
432 if opt.verbose > 0:
433 log.info("=========== sendEI SPB summary ==========")
434 log.info(" endpoint: {}".format(opt.endpoint))
435 log.info(" queue: {}".format(opt.queue))
436 log.info(" url: {}".format(urlx))
437 for g in info['guids']:
438 log.info(" guid[{:03d}]: {} {:7d} {:7d}".format(
439 g['fileno'], g['guid'], g['nevents'], g['nuevents']))
440 log.info(" number of guids: {:10d}".format(info['nfiles']))
441 log.info(" number of events: {:10d}".format(info['nevents']))
442 log.info(" number of unique evt: {:10d}".format(info['nuevents']))
443 log.info(" file size: {:10d} bytes".format(info['size']))
444 log.info(" uncompressed file size: {:10d} bytes".format(info['usize']))
445 if int(info['nevents']) != 0:
446 log.info(" mean size per evt: {:10.2f} bytes".format(
447 float(info['size'])/int(info['nevents'])))
448 log.info(" mean usize per evt: {:10.2f} bytes".format(
449 float(info['usize'])/int(info['nevents'])))
450
451 # wait 1 second to let messaging processing
452 time.sleep(1)
453 mbroker.close()
Definition MsgLevel.h:28

◆ endpointV()

python.sendEI_SPB_Lib.endpointV ( endpoint)

Definition at line 260 of file sendEI_SPB_Lib.py.

260def endpointV(endpoint):
261 # default port
262 dport = 60013
263
264 # enpoint should be broker1[:port1],broker2:[port2],...
265 lbroker = endpoint.split(',')
266 result = []
267 for b in lbroker:
268 try:
269 (host, port) = b.split(":")
270 except Exception:
271 host = b
272 port = dport
273 try:
274 port = int(port)
275 except Exception:
276 log.info("Invalid port {}".format(port))
277 raise Exception("Invalid port {}".format(port))
278 try:
279 (h, a, ip) = socket.gethostbyname_ex(host)
280 except Exception:
281 log.info("Host can not be resolved {}".format(host))
282 raise Exception("Invalid host {}".format(host))
283 for addr in ip:
284 result.append((addr, port))
285
286 return result
287
288

◆ eosCopy()

python.sendEI_SPB_Lib.eosCopy ( logger,
opt,
info )

Definition at line 237 of file sendEI_SPB_Lib.py.

237def eosCopy(logger, opt, info):
238
239 # logger
240 global log
241 log = logger
242
243 fname = opt.eifile
244 objID = info['uuid']
245
246 keyname = "{0}_{1}_{2}.ei.spb".format(info['taskID'], info['jobID'], objID)
247 rfile = "root://eosatlas.cern.ch//eos/atlas/atlascerngroupdisk/" \
248 "proj-evind/Consumer/data/{}".format(keyname)
249
250 try:
251 log.debug("eosCopy: using xrdcp -s {0} {1}".format(fname, rfile))
252 os.system("xrdcp -s {0} {1}".format(fname, rfile))
253 except Exception as e:
254 log.info("Unable to xrdcp file" + str(e))
255 raise
256
257 return rfile
258
259

◆ options()

python.sendEI_SPB_Lib.options ( argv)

Definition at line 289 of file sendEI_SPB_Lib.py.

289def options(argv):
290
291 parser = argparse.ArgumentParser(description='SendEI_SPB to object store '
292 'and notify EI supervisor.')
293 parser.add_argument('-e', '--endpoint', default='localhost:61613',
294 type=endpointV,
295 help="broker name and port")
296 parser.add_argument('-q', '--queue',
297 default='/queue/atlas.eventindex.supervisor',
298 help="broker queue name")
299 parser.add_argument('-u', '--user',
300 default=None,
301 help="Stomp user name")
302 parser.add_argument('-k', '--passcode',
303 default=None,
304 help="Stomp passcode")
305 parser.add_argument('-v', '--verbose',
306 default=0,
307 action='count',
308 help='Verbosity level')
309 parser.add_argument('-d', '--debug',
310 default=0,
311 action='count',
312 help='Debug')
313 parser.add_argument("-n", "--dummy",
314 action='store_true',
315 default=False,
316 help="Do not send messages and do not connect")
317 parser.add_argument("--s3endpoint",
318 default="localhost:443",
319 help="S3 AWS server name and port")
320 parser.add_argument("--awsaccess",
321 default=None,
322 help="AWS access key. "
323 "(default from AWSACCESS env variable)")
324 parser.add_argument("--awssecret",
325 default=None,
326 help="AWS secret key. "
327 "(default from AWSSECRET env variable)")
328 parser.add_argument("--s3secure",
329 default=True,
330 help="Secure connection to S3 AWS server")
331 parser.add_argument("--s3bucket",
332 default="atlas_eventindex",
333 help="S3 bucket name")
334 parser.add_argument("--eosfallback",
335 default=False,
336 help="Copy file into EOS if S3 fails")
337 parser.add_argument("-x", "--http",
338 action='store_true',
339 default=False,
340 help="Generate object http public link")
341 parser.add_argument("--objID",
342 default=None,
343 help="Object ID")
344
345 parser.add_argument('eifile', help="EventIndex file")
346
347 opt = parser.parse_args(args=argv)
348
349 if opt.user is None or opt.passcode is None:
350 if not(opt.user is None and opt.passcode is None):
351 log.error("Both, user and passcode must be specified or none")
352 sys.exit(0)
353
354 if opt.awsaccess is None or opt.awssecret is None:
355 opt.awsaccess = os.getenv('AWSACCESS', None)
356 opt.awssecret = os.getenv('AWSSECRET', None)
357
358 if opt.awsaccess is None or opt.awssecret is None:
359 log.error("Both ACCESS KEY and SECRET KEY must exists")
360 sys.exit(0)
361
362 return opt
363
364

◆ osCopy()

python.sendEI_SPB_Lib.osCopy ( logger,
opt,
info )

Definition at line 101 of file sendEI_SPB_Lib.py.

101def osCopy(logger, opt, info):
102
103 # logger
104 global log
105 log = logger
106
107 fname = opt.eifile
108 objID = info['uuid']
109
110 # build connection
111 try:
112 (s3host, s3port) = opt.s3endpoint.split(":")
113 except Exception:
114 s3host = opt.s3endpoint
115 if opt.s3secure:
116 s3port = 443
117 else:
118 s3port = 80
119 try:
120 s3port = int(s3port)
121 except Exception:
122 log.info("Invalid AWS port {}".format(s3port))
123 sys.exit(0)
124
125 # resolve host name. detect invalid connection endpoint
126 try:
127 (h, a, ip) = socket.gethostbyname_ex(s3host)
128 except Exception:
129 log.info("S3 host can not be resolved {}".format(s3host))
130 raise Exception("Invalid host {}".format(s3host))
131
132 if opt.s3secure:
133 s3endpoint = "https://{}:{}".format(s3host, s3port)
134 else:
135 s3endpoint = "http://{}:{}".format(s3host, s3port)
136
137 s3Config = None
138
139 # should we use a proxy ?
140 # eg.: http_proxy=squid.grid.sinica.edu.tw:3128
141 http_proxy = os.getenv('http_proxy')
142 if http_proxy is not None:
143 # take the first proxy if there are several
144 http_proxy0 = http_proxy.split(',')[0]
145 try:
146 (proxy, proxy_port) = http_proxy0.split(":")
147 except Exception:
148 proxy = http_proxy0
149 proxy_port = 3128
150 try:
151 proxy_port = int(proxy_port)
152 except Exception:
153 log.info("Invalid proxy port {}".format(proxy_port))
154 sys.exit(0)
155 http_proxy1 = "{}:{}".format(proxy, proxy_port)
156 log.info("osCopy using proxy {}".format(http_proxy1))
157
158 # use this proxy for both http and https
159 s3Config = Config(proxies={'http': http_proxy1, 'https': http_proxy1})
160
161 # get s3 resource
162 try:
163 log.debug("osCopy: try to get s3 resoruce")
164 s3 = boto3.resource(
165 service_name='s3',
166 aws_access_key_id=opt.awsaccess,
167 aws_secret_access_key=opt.awssecret,
168 endpoint_url=s3endpoint,
169 verify=True,
170 config=s3Config
171 )
172 except Exception as e:
173 log.info("Unable to connect to object store " + str(e))
174 raise
175
176 # copy to object store
177 bucket_name = opt.s3bucket
178
179 try:
180 taskID_1, taskID_2 = info['taskID'].split('.')
181 if taskID_2 == 'G':
182 flavor = 'panda'
183 elif taskID_2 == 'T':
184 flavor = 'tier0'
185 elif taskID_2 == 'X':
186 flavor = 'test'
187 else:
188 flavor = 'unknown'
189 except Exception:
190 flavor = "unknown"
191
192 keyname = "{0}/{1}/{2}_{3}_{4}.ei.spb".format(flavor,
193 info['taskID'].split('.')[0],
194 info['taskID'],
195 info['jobID'],
196 objID)
197
198 # convert metada values to strings
199 # do not include guids
200 eiMetadata = {k: info[k].__str__() for k in info if k != 'guids'}
201
202 # upload eiFile to s3 storage
203 try:
204 log.debug("osCopy: try to build object and upload eiFile")
205 object = s3.Object(bucket_name=bucket_name, key=keyname)
206 object.upload_file(fname,
207 ExtraArgs={"Metadata": eiMetadata,
208 'ContentType':
209 'application/octet-stream'})
210 object_acl = object.Acl()
211 object_acl.put(ACL='public-read')
212 if opt.http:
213 # get s3 client from resouce
214 s3client = s3.meta.client
215 url = s3client.generate_presigned_url('get_object',
216 Params={'Bucket':
217 bucket_name,
218 'Key': keyname},
219 HttpMethod="http")
220 except Exception as e:
221 log.info("Unable to store object " + str(e))
222 raise
223
224 if opt.http:
225 u = urlparse(url)
226 if u.port is not None:
227 urlx = "{}://{}:{}{}".format(u.scheme, u.hostname, u.port, u.path)
228 else:
229 urlx = "{}://{}{}".format(u.scheme, u.hostname, u.path)
230 else:
231 urlx = "s3://{}:{}/{}/{}".format(s3host, s3port, bucket_name, keyname)
232
233 log.debug("osCopy: eiFile successfully uploaded")
234 return urlx
235
236
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177