112 (s3host, s3port) = opt.s3endpoint.split(
":")
114 s3host = opt.s3endpoint
122 log.info(
"Invalid AWS port {}".format(s3port))
127 (h, a, ip) = socket.gethostbyname_ex(s3host)
129 log.info(
"S3 host can not be resolved {}".format(s3host))
130 raise Exception(
"Invalid host {}".format(s3host))
133 s3endpoint =
"https://{}:{}".format(s3host, s3port)
135 s3endpoint =
"http://{}:{}".format(s3host, s3port)
141 http_proxy = os.getenv(
'http_proxy')
142 if http_proxy
is not None:
144 http_proxy0 = http_proxy.split(
',')[0]
146 (proxy, proxy_port) = http_proxy0.split(
":")
151 proxy_port = int(proxy_port)
153 log.info(
"Invalid proxy port {}".format(proxy_port))
155 http_proxy1 =
"{}:{}".format(proxy, proxy_port)
156 log.info(
"osCopy using proxy {}".format(http_proxy1))
159 s3Config =
Config(proxies={
'http': http_proxy1,
'https': http_proxy1})
163 log.debug(
"osCopy: try to get s3 resoruce")
166 aws_access_key_id=opt.awsaccess,
167 aws_secret_access_key=opt.awssecret,
168 endpoint_url=s3endpoint,
172 except Exception
as e:
173 log.info(
"Unable to connect to object store " + str(e))
177 bucket_name = opt.s3bucket
180 taskID_1, taskID_2 = info[
'taskID'].
split(
'.')
183 elif taskID_2 ==
'T':
185 elif taskID_2 ==
'X':
192 keyname =
"{0}/{1}/{2}_{3}_{4}.ei.spb".format(flavor,
193 info[
'taskID'].
split(
'.')[0],
200 eiMetadata = {k: info[k].__str__()
for k
in info
if k !=
'guids'}
204 log.debug(
"osCopy: try to build object and upload eiFile")
205 object = s3.Object(bucket_name=bucket_name, key=keyname)
206 object.upload_file(fname,
207 ExtraArgs={
"Metadata": eiMetadata,
209 'application/octet-stream'})
210 object_acl = object.Acl()
211 object_acl.put(ACL=
'public-read')
214 s3client = s3.meta.client
215 url = s3client.generate_presigned_url(
'get_object',
220 except Exception
as e:
221 log.info(
"Unable to store object " + str(e))
226 if u.port
is not None:
227 urlx =
"{}://{}:{}{}".format(u.scheme, u.hostname, u.port, u.path)
229 urlx =
"{}://{}{}".format(u.scheme, u.hostname, u.path)
231 urlx =
"s3://{}:{}/{}/{}".format(s3host, s3port, bucket_name, keyname)
233 log.debug(
"osCopy: eiFile successfully uploaded")
373 if not (os.path.isfile(fname)
and os.access(fname, os.R_OK)):
374 log.info(
"Event Index SPB file {} does not "
375 "exists or is not readble".format(fname))
379 spbf = SpbFile(fname)
380 info = spbf.getInfo()
382 log.info(
"Unable to get info from EI SPB file {}".format(fname))
386 if 'provenanceRef' in info:
387 del info[
'provenanceRef']
388 if 'triggerInfo' in info:
389 del info[
'triggerInfo']
392 if opt.objID
is None:
393 objID = uuid.uuid4().hex
400 log.debug(
"call osCopy to transfer file")
401 urlx =
osCopy(logger, opt, info)
402 except Exception
as e:
403 log.info(
"Error trying to copy file to ObjectStore: "+str(e))
405 log.info(
"osCopy failed. Trying with eosCopy")
406 urlx =
eosCopy(logger, opt, info)
409 log.info(
"Unable to send SPB file")
414 ts = int(time.time() * 1000)
415 info[
'timestamp'] = ts
421 except stomp.exception.ConnectFailedException:
422 log.info(
"Unable to connect to stomp broker")
430 mbroker.sendMSG(json.dumps(msg))
433 log.info(
"=========== sendEI SPB summary ==========")
434 log.info(
" endpoint: {}".format(opt.endpoint))
435 log.info(
" queue: {}".format(opt.queue))
436 log.info(
" url: {}".format(urlx))
437 for g
in info[
'guids']:
438 log.info(
" guid[{:03d}]: {} {:7d} {:7d}".format(
439 g[
'fileno'], g[
'guid'], g[
'nevents'], g[
'nuevents']))
440 log.info(
" number of guids: {:10d}".format(info[
'nfiles']))
441 log.info(
" number of events: {:10d}".format(info[
'nevents']))
442 log.info(
" number of unique evt: {:10d}".format(info[
'nuevents']))
443 log.info(
" file size: {:10d} bytes".format(info[
'size']))
444 log.info(
" uncompressed file size: {:10d} bytes".format(info[
'usize']))
445 if int(info[
'nevents']) != 0:
446 log.info(
" mean size per evt: {:10.2f} bytes".format(
447 float(info[
'size'])/int(info[
'nevents'])))
448 log.info(
" mean usize per evt: {:10.2f} bytes".format(
449 float(info[
'usize'])/int(info[
'nevents'])))