10 from urllib.parse
import urlparse
12 from EventIndexProducer.spbfile
import SpbFile
17 from botocore.config
import Config
47 log.debug(
"Broker IPs and ports to connect:")
49 log.debug(
" {:15s} {}".
format(*broker))
52 conn = stomp.Connection(self.
brokers)
56 log.debug(
"Conecting broker using user and password")
59 conn.connect(wait=
True)
60 except stomp.exception.ConnectFailedException:
67 if self.
conn is not None:
68 self.
conn.disconnect()
73 log.debug(
"Sending message. Len: {}".
format(len(msg)))
80 JMSDeliveryMode=PERSISTENT,
83 print(msg, file=sys.stderr)
91 print(
'received an error', file=sys.stderr)
92 print(
" headers: ", headers, file=sys.stderr)
93 print(
" message: ", message, file=sys.stderr)
96 print(
'received a message', file=sys.stderr)
97 print(
" headers: ", headers, file=sys.stderr)
98 print(
" message: ", message, file=sys.stderr)
112 (s3host, s3port) = opt.s3endpoint.split(
":")
114 s3host = opt.s3endpoint
122 log.info(
"Invalid AWS port {}".
format(s3port))
127 (h, a, ip) = socket.gethostbyname_ex(s3host)
129 log.info(
"S3 host can not be resolved {}".
format(s3host))
130 raise Exception(
"Invalid host {}".
format(s3host))
133 s3endpoint =
"https://{}:{}".
format(s3host, s3port)
135 s3endpoint =
"http://{}:{}".
format(s3host, s3port)
141 http_proxy = os.getenv(
'http_proxy')
142 if http_proxy
is not None:
144 http_proxy0 = http_proxy.split(
',')[0]
146 (proxy, proxy_port) = http_proxy0.split(
":")
151 proxy_port =
int(proxy_port)
153 log.info(
"Invalid proxy port {}".
format(proxy_port))
155 http_proxy1 =
"{}:{}".
format(proxy, proxy_port)
156 log.info(
"osCopy using proxy {}".
format(http_proxy1))
159 s3Config = Config(proxies={
'http': http_proxy1,
'https': http_proxy1})
163 log.debug(
"osCopy: try to get s3 resoruce")
166 aws_access_key_id=opt.awsaccess,
167 aws_secret_access_key=opt.awssecret,
168 endpoint_url=s3endpoint,
172 except Exception
as e:
173 log.info(
"Unable to connect to object store " +
str(e))
177 bucket_name = opt.s3bucket
180 taskID_1, taskID_2 = info[
'taskID'].
split(
'.')
183 elif taskID_2 ==
'T':
185 elif taskID_2 ==
'X':
192 keyname =
"{0}/{1}/{2}_{3}_{4}.ei.spb".
format(flavor,
193 info[
'taskID'].
split(
'.')[0],
200 eiMetadata = {k: info[k].__str__()
for k
in info
if k !=
'guids'}
204 log.debug(
"osCopy: try to build object and upload eiFile")
205 object = s3.Object(bucket_name=bucket_name, key=keyname)
206 object.upload_file(fname,
207 ExtraArgs={
"Metadata": eiMetadata,
209 'application/octet-stream'})
210 object_acl = object.Acl()
211 object_acl.put(ACL=
'public-read')
214 s3client = s3.meta.client
215 url = s3client.generate_presigned_url(
'get_object',
220 except Exception
as e:
221 log.info(
"Unable to store object " +
str(e))
226 if u.port
is not None:
227 urlx =
"{}://{}:{}{}".
format(u.scheme, u.hostname, u.port, u.path)
229 urlx =
"{}://{}{}".
format(u.scheme, u.hostname, u.path)
231 urlx =
"s3://{}:{}/{}/{}".
format(s3host, s3port, bucket_name, keyname)
233 log.debug(
"osCopy: eiFile successfully uploaded")
246 keyname =
"{0}_{1}_{2}.ei.spb".
format(info[
'taskID'], info[
'jobID'], objID)
247 rfile =
"root://eosatlas.cern.ch//eos/atlas/atlascerngroupdisk/" \
248 "proj-evind/Consumer/data/{}".
format(keyname)
251 log.debug(
"eosCopy: using xrdcp -s {0} {1}".
format(fname, rfile))
252 os.system(
"xrdcp -s {0} {1}".
format(fname, rfile))
253 except Exception
as e:
254 log.info(
"Unable to xrdcp file" +
str(e))
265 lbroker = endpoint.split(
',')
269 (host, port) = b.split(
":")
276 log.info(
"Invalid port {}".
format(port))
277 raise Exception(
"Invalid port {}".
format(port))
279 (h, a, ip) = socket.gethostbyname_ex(host)
281 log.info(
"Host can not be resolved {}".
format(host))
282 raise Exception(
"Invalid host {}".
format(host))
284 result.append((addr, port))
291 parser = argparse.ArgumentParser(description=
'SendEI_SPB to object store '
292 'and notify EI supervisor.')
293 parser.add_argument(
'-e',
'--endpoint', default=
'localhost:61613',
295 help=
"broker name and port")
296 parser.add_argument(
'-q',
'--queue',
297 default=
'/queue/atlas.eventindex.supervisor',
298 help=
"broker queue name")
299 parser.add_argument(
'-u',
'--user',
301 help=
"Stomp user name")
302 parser.add_argument(
'-k',
'--passcode',
304 help=
"Stomp passcode")
305 parser.add_argument(
'-v',
'--verbose',
308 help=
'Verbosity level')
309 parser.add_argument(
'-d',
'--debug',
313 parser.add_argument(
"-n",
"--dummy",
316 help=
"Do not send messages and do not connect")
317 parser.add_argument(
"--s3endpoint",
318 default=
"localhost:443",
319 help=
"S3 AWS server name and port")
320 parser.add_argument(
"--awsaccess",
322 help=
"AWS access key. "
323 "(default from AWSACCESS env variable)")
324 parser.add_argument(
"--awssecret",
326 help=
"AWS secret key. "
327 "(default from AWSSECRET env variable)")
328 parser.add_argument(
"--s3secure",
330 help=
"Secure connection to S3 AWS server")
331 parser.add_argument(
"--s3bucket",
332 default=
"atlas_eventindex",
333 help=
"S3 bucket name")
334 parser.add_argument(
"--eosfallback",
336 help=
"Copy file into EOS if S3 fails")
337 parser.add_argument(
"-x",
"--http",
340 help=
"Generate object http public link")
341 parser.add_argument(
"--objID",
345 parser.add_argument(
'eifile', help=
"EventIndex file")
347 opt = parser.parse_args(args=argv)
349 if opt.user
is None or opt.passcode
is None:
350 if not(opt.user
is None and opt.passcode
is None):
351 log.error(
"Both, user and passcode must be specified or none")
354 if opt.awsaccess
is None or opt.awssecret
is None:
355 opt.awsaccess = os.getenv(
'AWSACCESS',
None)
356 opt.awssecret = os.getenv(
'AWSSECRET',
None)
358 if opt.awsaccess
is None or opt.awssecret
is None:
359 log.error(
"Both ACCESS KEY and SECRET KEY must exists")
373 if not (os.path.isfile(fname)
and os.access(fname, os.R_OK)):
374 log.info(
"Event Index SPB file {} does not "
375 "exists or is not readble".
format(fname))
379 spbf = SpbFile(fname)
380 info = spbf.getInfo()
382 log.info(
"Unable to get info from EI SPB file {}".
format(fname))
386 if 'provenanceRef' in info:
387 del info[
'provenanceRef']
388 if 'triggerInfo' in info:
389 del info[
'triggerInfo']
392 if opt.objID
is None:
393 objID = uuid.uuid4().hex
400 log.debug(
"call osCopy to transfer file")
401 urlx =
osCopy(logger, opt, info)
402 except Exception
as e:
403 log.info(
"Error trying to copy file to ObjectStore: "+
str(e))
405 log.info(
"osCopy failed. Trying with eosCopy")
406 urlx =
eosCopy(logger, opt, info)
409 log.info(
"Unable to send SPB file")
414 ts =
int(time.time() * 1000)
415 info[
'timestamp'] = ts
421 except stomp.exception.ConnectFailedException:
422 log.info(
"Unable to connect to stomp broker")
430 mbroker.sendMSG(json.dumps(msg))
433 log.info(
"=========== sendEI SPB summary ==========")
434 log.info(
" endpoint: {}".
format(opt.endpoint))
435 log.info(
" queue: {}".
format(opt.queue))
436 log.info(
" url: {}".
format(urlx))
437 for g
in info[
'guids']:
438 log.info(
" guid[{:03d}]: {} {:7d} {:7d}".
format(
439 g[
'fileno'], g[
'guid'], g[
'nevents'], g[
'nuevents']))
440 log.info(
" number of guids: {:10d}".
format(info[
'nfiles']))
441 log.info(
" number of events: {:10d}".
format(info[
'nevents']))
442 log.info(
" number of unique evt: {:10d}".
format(info[
'nuevents']))
443 log.info(
" file size: {:10d} bytes".
format(info[
'size']))
444 log.info(
" uncompressed file size: {:10d} bytes".
format(info[
'usize']))
445 if int(info[
'nevents']) != 0:
446 log.info(
" mean size per evt: {:10.2f} bytes".
format(
447 float(info[
'size'])/
int(info[
'nevents'])))
448 log.info(
" mean usize per evt: {:10.2f} bytes".
format(
449 float(info[
'usize'])/
int(info[
'nevents'])))