ATLAS Offline Software
Loading...
Searching...
No Matches
sendEI_SPB_Lib.py
Go to the documentation of this file.
1# Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
2
3import sys
4import os
5import json
6import stomp
7import socket
8import uuid
9import time
10from urllib.parse import urlparse
11
12from EventIndexProducer.spbfile import SpbFile
13
14import argparse
15import boto3
16
17from botocore.config import Config
18
19
20class MSG(object):
21
22 def __init__(self, opt):
23
24 self.brokers = opt.endpoint
25 self.user = opt.user
26 self.passcode = opt.passcode
27 self.queue = opt.queue
28 self.verbose = opt.verbose
29 self.dummy = opt.dummy
30
31 self.conn = None
32
33 def isConnected(self):
34 if self.conn is None:
35 return False
36 else:
37 return True
38
39 def connect(self):
40
41 # prepare for connection
42 self.conn = None
43
44 if self.dummy:
45 return
46
47 log.debug("Broker IPs and ports to connect:")
48 for broker in self.brokers:
49 log.debug(" {:15s} {}".format(*broker))
50
51 # connect
52 conn = stomp.Connection(self.brokers)
53 conn.set_listener('', MyListener())
54 try:
55 if self.user is not None and self.passcode is not None:
56 log.debug("Conecting broker using user and password")
57 conn.connect(self.user, self.passcode, wait=True)
58 else:
59 conn.connect(wait=True)
60 except stomp.exception.ConnectFailedException:
61 # pass error to caller
62 raise
63
64 self.conn = conn
65
66 def close(self):
67 if self.conn is not None:
68 self.conn.disconnect()
69 self.conn = None
70
71 def sendMSG(self, msg):
72
73 log.debug("Sending message. Len: {}".format(len(msg)))
74
75 # NON_PERSISTENT = "1"
76 PERSISTENT = "2"
77
78 if not self.dummy:
79 self.conn.send(self.queue, msg,
80 JMSDeliveryMode=PERSISTENT,
81 JMSExpiration=0)
82 if self.verbose > 2:
83 print(msg, file=sys.stderr)
84
85
86# ----------------------
87# listener
88# ----------------------
90 def on_error(self, headers, message):
91 print('received an error', file=sys.stderr)
92 print(" headers: ", headers, file=sys.stderr)
93 print(" message: ", message, file=sys.stderr)
94
95 def on_message(self, headers, message):
96 print('received a message', file=sys.stderr)
97 print(" headers: ", headers, file=sys.stderr)
98 print(" message: ", message, file=sys.stderr)
99
100
101def osCopy(logger, opt, info):
102
103 # logger
104 global log
105 log = logger
106
107 fname = opt.eifile
108 objID = info['uuid']
109
110 # build connection
111 try:
112 (s3host, s3port) = opt.s3endpoint.split(":")
113 except Exception:
114 s3host = opt.s3endpoint
115 if opt.s3secure:
116 s3port = 443
117 else:
118 s3port = 80
119 try:
120 s3port = int(s3port)
121 except Exception:
122 log.info("Invalid AWS port {}".format(s3port))
123 sys.exit(0)
124
125 # resolve host name. detect invalid connection endpoint
126 try:
127 (h, a, ip) = socket.gethostbyname_ex(s3host)
128 except Exception:
129 log.info("S3 host can not be resolved {}".format(s3host))
130 raise Exception("Invalid host {}".format(s3host))
131
132 if opt.s3secure:
133 s3endpoint = "https://{}:{}".format(s3host, s3port)
134 else:
135 s3endpoint = "http://{}:{}".format(s3host, s3port)
136
137 s3Config = None
138
139 # should we use a proxy ?
140 # eg.: http_proxy=squid.grid.sinica.edu.tw:3128
141 http_proxy = os.getenv('http_proxy')
142 if http_proxy is not None:
143 # take the first proxy if there are several
144 http_proxy0 = http_proxy.split(',')[0]
145 try:
146 (proxy, proxy_port) = http_proxy0.split(":")
147 except Exception:
148 proxy = http_proxy0
149 proxy_port = 3128
150 try:
151 proxy_port = int(proxy_port)
152 except Exception:
153 log.info("Invalid proxy port {}".format(proxy_port))
154 sys.exit(0)
155 http_proxy1 = "{}:{}".format(proxy, proxy_port)
156 log.info("osCopy using proxy {}".format(http_proxy1))
157
158 # use this proxy for both http and https
159 s3Config = Config(proxies={'http': http_proxy1, 'https': http_proxy1})
160
161 # get s3 resource
162 try:
163 log.debug("osCopy: try to get s3 resoruce")
164 s3 = boto3.resource(
165 service_name='s3',
166 aws_access_key_id=opt.awsaccess,
167 aws_secret_access_key=opt.awssecret,
168 endpoint_url=s3endpoint,
169 verify=True,
170 config=s3Config
171 )
172 except Exception as e:
173 log.info("Unable to connect to object store " + str(e))
174 raise
175
176 # copy to object store
177 bucket_name = opt.s3bucket
178
179 try:
180 taskID_1, taskID_2 = info['taskID'].split('.')
181 if taskID_2 == 'G':
182 flavor = 'panda'
183 elif taskID_2 == 'T':
184 flavor = 'tier0'
185 elif taskID_2 == 'X':
186 flavor = 'test'
187 else:
188 flavor = 'unknown'
189 except Exception:
190 flavor = "unknown"
191
192 keyname = "{0}/{1}/{2}_{3}_{4}.ei.spb".format(flavor,
193 info['taskID'].split('.')[0],
194 info['taskID'],
195 info['jobID'],
196 objID)
197
198 # convert metada values to strings
199 # do not include guids
200 eiMetadata = {k: info[k].__str__() for k in info if k != 'guids'}
201
202 # upload eiFile to s3 storage
203 try:
204 log.debug("osCopy: try to build object and upload eiFile")
205 object = s3.Object(bucket_name=bucket_name, key=keyname)
206 object.upload_file(fname,
207 ExtraArgs={"Metadata": eiMetadata,
208 'ContentType':
209 'application/octet-stream'})
210 object_acl = object.Acl()
211 object_acl.put(ACL='public-read')
212 if opt.http:
213 # get s3 client from resouce
214 s3client = s3.meta.client
215 url = s3client.generate_presigned_url('get_object',
216 Params={'Bucket':
217 bucket_name,
218 'Key': keyname},
219 HttpMethod="http")
220 except Exception as e:
221 log.info("Unable to store object " + str(e))
222 raise
223
224 if opt.http:
225 u = urlparse(url)
226 if u.port is not None:
227 urlx = "{}://{}:{}{}".format(u.scheme, u.hostname, u.port, u.path)
228 else:
229 urlx = "{}://{}{}".format(u.scheme, u.hostname, u.path)
230 else:
231 urlx = "s3://{}:{}/{}/{}".format(s3host, s3port, bucket_name, keyname)
232
233 log.debug("osCopy: eiFile successfully uploaded")
234 return urlx
235
236
237def eosCopy(logger, opt, info):
238
239 # logger
240 global log
241 log = logger
242
243 fname = opt.eifile
244 objID = info['uuid']
245
246 keyname = "{0}_{1}_{2}.ei.spb".format(info['taskID'], info['jobID'], objID)
247 rfile = "root://eosatlas.cern.ch//eos/atlas/atlascerngroupdisk/" \
248 "proj-evind/Consumer/data/{}".format(keyname)
249
250 try:
251 log.debug("eosCopy: using xrdcp -s {0} {1}".format(fname, rfile))
252 os.system("xrdcp -s {0} {1}".format(fname, rfile))
253 except Exception as e:
254 log.info("Unable to xrdcp file" + str(e))
255 raise
256
257 return rfile
258
259
260def endpointV(endpoint):
261 # default port
262 dport = 60013
263
264 # enpoint should be broker1[:port1],broker2:[port2],...
265 lbroker = endpoint.split(',')
266 result = []
267 for b in lbroker:
268 try:
269 (host, port) = b.split(":")
270 except Exception:
271 host = b
272 port = dport
273 try:
274 port = int(port)
275 except Exception:
276 log.info("Invalid port {}".format(port))
277 raise Exception("Invalid port {}".format(port))
278 try:
279 (h, a, ip) = socket.gethostbyname_ex(host)
280 except Exception:
281 log.info("Host can not be resolved {}".format(host))
282 raise Exception("Invalid host {}".format(host))
283 for addr in ip:
284 result.append((addr, port))
285
286 return result
287
288
289def options(argv):
290
291 parser = argparse.ArgumentParser(description='SendEI_SPB to object store '
292 'and notify EI supervisor.')
293 parser.add_argument('-e', '--endpoint', default='localhost:61613',
294 type=endpointV,
295 help="broker name and port")
296 parser.add_argument('-q', '--queue',
297 default='/queue/atlas.eventindex.supervisor',
298 help="broker queue name")
299 parser.add_argument('-u', '--user',
300 default=None,
301 help="Stomp user name")
302 parser.add_argument('-k', '--passcode',
303 default=None,
304 help="Stomp passcode")
305 parser.add_argument('-v', '--verbose',
306 default=0,
307 action='count',
308 help='Verbosity level')
309 parser.add_argument('-d', '--debug',
310 default=0,
311 action='count',
312 help='Debug')
313 parser.add_argument("-n", "--dummy",
314 action='store_true',
315 default=False,
316 help="Do not send messages and do not connect")
317 parser.add_argument("--s3endpoint",
318 default="localhost:443",
319 help="S3 AWS server name and port")
320 parser.add_argument("--awsaccess",
321 default=None,
322 help="AWS access key. "
323 "(default from AWSACCESS env variable)")
324 parser.add_argument("--awssecret",
325 default=None,
326 help="AWS secret key. "
327 "(default from AWSSECRET env variable)")
328 parser.add_argument("--s3secure",
329 default=True,
330 help="Secure connection to S3 AWS server")
331 parser.add_argument("--s3bucket",
332 default="atlas_eventindex",
333 help="S3 bucket name")
334 parser.add_argument("--eosfallback",
335 default=False,
336 help="Copy file into EOS if S3 fails")
337 parser.add_argument("-x", "--http",
338 action='store_true',
339 default=False,
340 help="Generate object http public link")
341 parser.add_argument("--objID",
342 default=None,
343 help="Object ID")
344
345 parser.add_argument('eifile', help="EventIndex file")
346
347 opt = parser.parse_args(args=argv)
348
349 if opt.user is None or opt.passcode is None:
350 if not(opt.user is None and opt.passcode is None):
351 log.error("Both, user and passcode must be specified or none")
352 sys.exit(0)
353
354 if opt.awsaccess is None or opt.awssecret is None:
355 opt.awsaccess = os.getenv('AWSACCESS', None)
356 opt.awssecret = os.getenv('AWSSECRET', None)
357
358 if opt.awsaccess is None or opt.awssecret is None:
359 log.error("Both ACCESS KEY and SECRET KEY must exists")
360 sys.exit(0)
361
362 return opt
363
364
365def eimrun(logger, opt):
366
367 # logger
368 global log
369 log = logger
370
371 # open EI file and get info
372 fname = opt.eifile
373 if not (os.path.isfile(fname) and os.access(fname, os.R_OK)):
374 log.info("Event Index SPB file {} does not "
375 "exists or is not readble".format(fname))
376 sys.exit(0)
377
378 try:
379 spbf = SpbFile(fname)
380 info = spbf.getInfo()
381 except Exception:
382 log.info("Unable to get info from EI SPB file {}".format(fname))
383 sys.exit(0)
384
385 # remove keys not to be included
386 if 'provenanceRef' in info:
387 del info['provenanceRef']
388 if 'triggerInfo' in info:
389 del info['triggerInfo']
390
391 # prepare to copy
392 if opt.objID is None:
393 objID = uuid.uuid4().hex
394 else:
395 objID = opt.objID
396 info['uuid'] = objID
397
398 urlx = None
399 try:
400 log.debug("call osCopy to transfer file")
401 urlx = osCopy(logger, opt, info)
402 except Exception as e:
403 log.info("Error trying to copy file to ObjectStore: "+str(e))
404 if opt.eosfallback:
405 log.info("osCopy failed. Trying with eosCopy")
406 urlx = eosCopy(logger, opt, info)
407
408 if urlx is None:
409 log.info("Unable to send SPB file")
410 sys.exit(0)
411
412 # attach timestamp and file url into file info.
413 # it will be sent to the broker messaging
414 ts = int(time.time() * 1000)
415 info['timestamp'] = ts
416 info['url'] = urlx
417
418 mbroker = MSG(opt)
419 try:
420 mbroker.connect()
421 except stomp.exception.ConnectFailedException:
422 log.info("Unable to connect to stomp broker")
423 sys.exit(0)
424
425 msg = {}
426 msg['type'] = 1 # PROD -> SUP
427 msg['data'] = info # payload
428 msg['ts'] = ts # timestamp
429 msg['id'] = objID # instance id
430 mbroker.sendMSG(json.dumps(msg))
431
432 if opt.verbose > 0:
433 log.info("=========== sendEI SPB summary ==========")
434 log.info(" endpoint: {}".format(opt.endpoint))
435 log.info(" queue: {}".format(opt.queue))
436 log.info(" url: {}".format(urlx))
437 for g in info['guids']:
438 log.info(" guid[{:03d}]: {} {:7d} {:7d}".format(
439 g['fileno'], g['guid'], g['nevents'], g['nuevents']))
440 log.info(" number of guids: {:10d}".format(info['nfiles']))
441 log.info(" number of events: {:10d}".format(info['nevents']))
442 log.info(" number of unique evt: {:10d}".format(info['nuevents']))
443 log.info(" file size: {:10d} bytes".format(info['size']))
444 log.info(" uncompressed file size: {:10d} bytes".format(info['usize']))
445 if int(info['nevents']) != 0:
446 log.info(" mean size per evt: {:10.2f} bytes".format(
447 float(info['size'])/int(info['nevents'])))
448 log.info(" mean usize per evt: {:10.2f} bytes".format(
449 float(info['usize'])/int(info['nevents'])))
450
451 # wait 1 second to let messaging processing
452 time.sleep(1)
453 mbroker.close()
void print(char *figname, TCanvas *c1)
on_error(self, headers, message)
on_message(self, headers, message)
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
eosCopy(logger, opt, info)
osCopy(logger, opt, info)