ATLAS Offline Software
sendEI_SPB_Lib.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
2 
3 import sys
4 import os
5 import json
6 import stomp
7 import socket
8 import uuid
9 import time
10 from urllib.parse import urlparse
11 
12 from EventIndexProducer.spbfile import SpbFile
13 
14 import argparse
15 import boto3
16 
17 from botocore.config import Config
18 
19 
20 class MSG(object):
21 
22  def __init__(self, opt):
23 
24  self.brokers = opt.endpoint
25  self.user = opt.user
26  self.passcode = opt.passcode
27  self.queue = opt.queue
28  self.verbose = opt.verbose
29  self.dummy = opt.dummy
30 
31  self.conn = None
32 
33  def isConnected(self):
34  if self.conn is None:
35  return False
36  else:
37  return True
38 
39  def connect(self):
40 
41  # prepare for connection
42  self.conn = None
43 
44  if self.dummy:
45  return
46 
47  log.debug("Broker IPs and ports to connect:")
48  for broker in self.brokers:
49  log.debug(" {:15s} {}".format(*broker))
50 
51  # connect
52  conn = stomp.Connection(self.brokers)
53  conn.set_listener('', MyListener())
54  try:
55  if self.user is not None and self.passcode is not None:
56  log.debug("Conecting broker using user and password")
57  conn.connect(self.user, self.passcode, wait=True)
58  else:
59  conn.connect(wait=True)
60  except stomp.exception.ConnectFailedException:
61  # pass error to caller
62  raise
63 
64  self.conn = conn
65 
66  def close(self):
67  if self.conn is not None:
68  self.conn.disconnect()
69  self.conn = None
70 
71  def sendMSG(self, msg):
72 
73  log.debug("Sending message. Len: {}".format(len(msg)))
74 
75  # NON_PERSISTENT = "1"
76  PERSISTENT = "2"
77 
78  if not self.dummy:
79  self.conn.send(self.queue, msg,
80  JMSDeliveryMode=PERSISTENT,
81  JMSExpiration=0)
82  if self.verbose > 2:
83  print(msg, file=sys.stderr)
84 
85 
86 # ----------------------
87 # listener
88 # ----------------------
90  def on_error(self, headers, message):
91  print('received an error', file=sys.stderr)
92  print(" headers: ", headers, file=sys.stderr)
93  print(" message: ", message, file=sys.stderr)
94 
95  def on_message(self, headers, message):
96  print('received a message', file=sys.stderr)
97  print(" headers: ", headers, file=sys.stderr)
98  print(" message: ", message, file=sys.stderr)
99 
100 
101 def osCopy(logger, opt, info):
102 
103  # logger
104  global log
105  log = logger
106 
107  fname = opt.eifile
108  objID = info['uuid']
109 
110  # build connection
111  try:
112  (s3host, s3port) = opt.s3endpoint.split(":")
113  except Exception:
114  s3host = opt.s3endpoint
115  if opt.s3secure:
116  s3port = 443
117  else:
118  s3port = 80
119  try:
120  s3port = int(s3port)
121  except Exception:
122  log.info("Invalid AWS port {}".format(s3port))
123  sys.exit(0)
124 
125  # resolve host name. detect invalid connection endpoint
126  try:
127  (h, a, ip) = socket.gethostbyname_ex(s3host)
128  except Exception:
129  log.info("S3 host can not be resolved {}".format(s3host))
130  raise Exception("Invalid host {}".format(s3host))
131 
132  if opt.s3secure:
133  s3endpoint = "https://{}:{}".format(s3host, s3port)
134  else:
135  s3endpoint = "http://{}:{}".format(s3host, s3port)
136 
137  s3Config = None
138 
139  # should we use a proxy ?
140  # eg.: http_proxy=squid.grid.sinica.edu.tw:3128
141  http_proxy = os.getenv('http_proxy')
142  if http_proxy is not None:
143  # take the first proxy if there are several
144  http_proxy0 = http_proxy.split(',')[0]
145  try:
146  (proxy, proxy_port) = http_proxy0.split(":")
147  except Exception:
148  proxy = http_proxy0
149  proxy_port = 3128
150  try:
151  proxy_port = int(proxy_port)
152  except Exception:
153  log.info("Invalid proxy port {}".format(proxy_port))
154  sys.exit(0)
155  http_proxy1 = "{}:{}".format(proxy, proxy_port)
156  log.info("osCopy using proxy {}".format(http_proxy1))
157 
158  # use this proxy for both http and https
159  s3Config = Config(proxies={'http': http_proxy1, 'https': http_proxy1})
160 
161  # get s3 resource
162  try:
163  log.debug("osCopy: try to get s3 resoruce")
164  s3 = boto3.resource(
165  service_name='s3',
166  aws_access_key_id=opt.awsaccess,
167  aws_secret_access_key=opt.awssecret,
168  endpoint_url=s3endpoint,
169  verify=True,
170  config=s3Config
171  )
172  except Exception as e:
173  log.info("Unable to connect to object store " + str(e))
174  raise
175 
176  # copy to object store
177  bucket_name = opt.s3bucket
178 
179  try:
180  taskID_1, taskID_2 = info['taskID'].split('.')
181  if taskID_2 == 'G':
182  flavor = 'panda'
183  elif taskID_2 == 'T':
184  flavor = 'tier0'
185  elif taskID_2 == 'X':
186  flavor = 'test'
187  else:
188  flavor = 'unknown'
189  except Exception:
190  flavor = "unknown"
191 
192  keyname = "{0}/{1}/{2}_{3}_{4}.ei.spb".format(flavor,
193  info['taskID'].split('.')[0],
194  info['taskID'],
195  info['jobID'],
196  objID)
197 
198  # convert metada values to strings
199  # do not include guids
200  eiMetadata = {k: info[k].__str__() for k in info if k != 'guids'}
201 
202  # upload eiFile to s3 storage
203  try:
204  log.debug("osCopy: try to build object and upload eiFile")
205  object = s3.Object(bucket_name=bucket_name, key=keyname)
206  object.upload_file(fname,
207  ExtraArgs={"Metadata": eiMetadata,
208  'ContentType':
209  'application/octet-stream'})
210  object_acl = object.Acl()
211  object_acl.put(ACL='public-read')
212  if opt.http:
213  # get s3 client from resouce
214  s3client = s3.meta.client
215  url = s3client.generate_presigned_url('get_object',
216  Params={'Bucket':
217  bucket_name,
218  'Key': keyname},
219  HttpMethod="http")
220  except Exception as e:
221  log.info("Unable to store object " + str(e))
222  raise
223 
224  if opt.http:
225  u = urlparse(url)
226  if u.port is not None:
227  urlx = "{}://{}:{}{}".format(u.scheme, u.hostname, u.port, u.path)
228  else:
229  urlx = "{}://{}{}".format(u.scheme, u.hostname, u.path)
230  else:
231  urlx = "s3://{}:{}/{}/{}".format(s3host, s3port, bucket_name, keyname)
232 
233  log.debug("osCopy: eiFile successfully uploaded")
234  return urlx
235 
236 
237 def eosCopy(logger, opt, info):
238 
239  # logger
240  global log
241  log = logger
242 
243  fname = opt.eifile
244  objID = info['uuid']
245 
246  keyname = "{0}_{1}_{2}.ei.spb".format(info['taskID'], info['jobID'], objID)
247  rfile = "root://eosatlas.cern.ch//eos/atlas/atlascerngroupdisk/" \
248  "proj-evind/Consumer/data/{}".format(keyname)
249 
250  try:
251  log.debug("eosCopy: using xrdcp -s {0} {1}".format(fname, rfile))
252  os.system("xrdcp -s {0} {1}".format(fname, rfile))
253  except Exception as e:
254  log.info("Unable to xrdcp file" + str(e))
255  raise
256 
257  return rfile
258 
259 
260 def endpointV(endpoint):
261  # default port
262  dport = 60013
263 
264  # enpoint should be broker1[:port1],broker2:[port2],...
265  lbroker = endpoint.split(',')
266  result = []
267  for b in lbroker:
268  try:
269  (host, port) = b.split(":")
270  except Exception:
271  host = b
272  port = dport
273  try:
274  port = int(port)
275  except Exception:
276  log.info("Invalid port {}".format(port))
277  raise Exception("Invalid port {}".format(port))
278  try:
279  (h, a, ip) = socket.gethostbyname_ex(host)
280  except Exception:
281  log.info("Host can not be resolved {}".format(host))
282  raise Exception("Invalid host {}".format(host))
283  for addr in ip:
284  result.append((addr, port))
285 
286  return result
287 
288 
289 def options(argv):
290 
291  parser = argparse.ArgumentParser(description='SendEI_SPB to object store '
292  'and notify EI supervisor.')
293  parser.add_argument('-e', '--endpoint', default='localhost:61613',
294  type=endpointV,
295  help="broker name and port")
296  parser.add_argument('-q', '--queue',
297  default='/queue/atlas.eventindex.supervisor',
298  help="broker queue name")
299  parser.add_argument('-u', '--user',
300  default=None,
301  help="Stomp user name")
302  parser.add_argument('-k', '--passcode',
303  default=None,
304  help="Stomp passcode")
305  parser.add_argument('-v', '--verbose',
306  default=0,
307  action='count',
308  help='Verbosity level')
309  parser.add_argument('-d', '--debug',
310  default=0,
311  action='count',
312  help='Debug')
313  parser.add_argument("-n", "--dummy",
314  action='store_true',
315  default=False,
316  help="Do not send messages and do not connect")
317  parser.add_argument("--s3endpoint",
318  default="localhost:443",
319  help="S3 AWS server name and port")
320  parser.add_argument("--awsaccess",
321  default=None,
322  help="AWS access key. "
323  "(default from AWSACCESS env variable)")
324  parser.add_argument("--awssecret",
325  default=None,
326  help="AWS secret key. "
327  "(default from AWSSECRET env variable)")
328  parser.add_argument("--s3secure",
329  default=True,
330  help="Secure connection to S3 AWS server")
331  parser.add_argument("--s3bucket",
332  default="atlas_eventindex",
333  help="S3 bucket name")
334  parser.add_argument("--eosfallback",
335  default=False,
336  help="Copy file into EOS if S3 fails")
337  parser.add_argument("-x", "--http",
338  action='store_true',
339  default=False,
340  help="Generate object http public link")
341  parser.add_argument("--objID",
342  default=None,
343  help="Object ID")
344 
345  parser.add_argument('eifile', help="EventIndex file")
346 
347  opt = parser.parse_args(args=argv)
348 
349  if opt.user is None or opt.passcode is None:
350  if not(opt.user is None and opt.passcode is None):
351  log.error("Both, user and passcode must be specified or none")
352  sys.exit(0)
353 
354  if opt.awsaccess is None or opt.awssecret is None:
355  opt.awsaccess = os.getenv('AWSACCESS', None)
356  opt.awssecret = os.getenv('AWSSECRET', None)
357 
358  if opt.awsaccess is None or opt.awssecret is None:
359  log.error("Both ACCESS KEY and SECRET KEY must exists")
360  sys.exit(0)
361 
362  return opt
363 
364 
365 def eimrun(logger, opt):
366 
367  # logger
368  global log
369  log = logger
370 
371  # open EI file and get info
372  fname = opt.eifile
373  if not (os.path.isfile(fname) and os.access(fname, os.R_OK)):
374  log.info("Event Index SPB file {} does not "
375  "exists or is not readble".format(fname))
376  sys.exit(0)
377 
378  try:
379  spbf = SpbFile(fname)
380  info = spbf.getInfo()
381  except Exception:
382  log.info("Unable to get info from EI SPB file {}".format(fname))
383  sys.exit(0)
384 
385  # remove keys not to be included
386  if 'provenanceRef' in info:
387  del info['provenanceRef']
388  if 'triggerInfo' in info:
389  del info['triggerInfo']
390 
391  # prepare to copy
392  if opt.objID is None:
393  objID = uuid.uuid4().hex
394  else:
395  objID = opt.objID
396  info['uuid'] = objID
397 
398  urlx = None
399  try:
400  log.debug("call osCopy to transfer file")
401  urlx = osCopy(logger, opt, info)
402  except Exception as e:
403  log.info("Error trying to copy file to ObjectStore: "+str(e))
404  if opt.eosfallback:
405  log.info("osCopy failed. Trying with eosCopy")
406  urlx = eosCopy(logger, opt, info)
407 
408  if urlx is None:
409  log.info("Unable to send SPB file")
410  sys.exit(0)
411 
412  # attach timestamp and file url into file info.
413  # it will be sent to the broker messaging
414  ts = int(time.time() * 1000)
415  info['timestamp'] = ts
416  info['url'] = urlx
417 
418  mbroker = MSG(opt)
419  try:
420  mbroker.connect()
421  except stomp.exception.ConnectFailedException:
422  log.info("Unable to connect to stomp broker")
423  sys.exit(0)
424 
425  msg = {}
426  msg['type'] = 1 # PROD -> SUP
427  msg['data'] = info # payload
428  msg['ts'] = ts # timestamp
429  msg['id'] = objID # instance id
430  mbroker.sendMSG(json.dumps(msg))
431 
432  if opt.verbose > 0:
433  log.info("=========== sendEI SPB summary ==========")
434  log.info(" endpoint: {}".format(opt.endpoint))
435  log.info(" queue: {}".format(opt.queue))
436  log.info(" url: {}".format(urlx))
437  for g in info['guids']:
438  log.info(" guid[{:03d}]: {} {:7d} {:7d}".format(
439  g['fileno'], g['guid'], g['nevents'], g['nuevents']))
440  log.info(" number of guids: {:10d}".format(info['nfiles']))
441  log.info(" number of events: {:10d}".format(info['nevents']))
442  log.info(" number of unique evt: {:10d}".format(info['nuevents']))
443  log.info(" file size: {:10d} bytes".format(info['size']))
444  log.info(" uncompressed file size: {:10d} bytes".format(info['usize']))
445  if int(info['nevents']) != 0:
446  log.info(" mean size per evt: {:10.2f} bytes".format(
447  float(info['size'])/int(info['nevents'])))
448  log.info(" mean usize per evt: {:10.2f} bytes".format(
449  float(info['usize'])/int(info['nevents'])))
450 
451  # wait 1 second to let messaging processing
452  time.sleep(1)
453  mbroker.close()
python.sendEI_SPB_Lib.MyListener.on_error
def on_error(self, headers, message)
Definition: sendEI_SPB_Lib.py:90
python.sendEI_SPB_Lib.endpointV
def endpointV(endpoint)
Definition: sendEI_SPB_Lib.py:260
python.sendEI_SPB_Lib.MSG.sendMSG
def sendMSG(self, msg)
Definition: sendEI_SPB_Lib.py:71
python.sendEI_SPB_Lib.eimrun
def eimrun(logger, opt)
Definition: sendEI_SPB_Lib.py:365
python.sendEI_SPB_Lib.options
def options(argv)
Definition: sendEI_SPB_Lib.py:289
vtune_athena.format
format
Definition: vtune_athena.py:14
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.sendEI_SPB_Lib.MSG.conn
conn
Definition: sendEI_SPB_Lib.py:31
python.sendEI_SPB_Lib.MyListener
Definition: sendEI_SPB_Lib.py:89
python.sendEI_SPB_Lib.MyListener.on_message
def on_message(self, headers, message)
Definition: sendEI_SPB_Lib.py:95
python.sendEI_SPB_Lib.MSG.isConnected
def isConnected(self)
Definition: sendEI_SPB_Lib.py:33
python.sendEI_SPB_Lib.MSG.__init__
def __init__(self, opt)
Definition: sendEI_SPB_Lib.py:22
python.sendEI_SPB_Lib.MSG.connect
def connect(self)
Definition: sendEI_SPB_Lib.py:39
python.sendEI_SPB_Lib.MSG.dummy
dummy
Definition: sendEI_SPB_Lib.py:29
python.sendEI_SPB_Lib.osCopy
def osCopy(logger, opt, info)
Definition: sendEI_SPB_Lib.py:101
python.sendEI_SPB_Lib.eosCopy
def eosCopy(logger, opt, info)
Definition: sendEI_SPB_Lib.py:237
python.sendEI_SPB_Lib.MSG.queue
queue
Definition: sendEI_SPB_Lib.py:27
python.sendEI_SPB_Lib.MSG.verbose
verbose
Definition: sendEI_SPB_Lib.py:28
python.sendEI_SPB_Lib.MSG.user
user
Definition: sendEI_SPB_Lib.py:25
python.sendEI_SPB_Lib.MSG.close
def close(self)
Definition: sendEI_SPB_Lib.py:66
python.sendEI_SPB_Lib.MSG
Definition: sendEI_SPB_Lib.py:20
pickleTool.object
object
Definition: pickleTool.py:30
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
python.sendEI_SPB_Lib.MSG.brokers
brokers
Definition: sendEI_SPB_Lib.py:24
python.sendEI_SPB_Lib.MSG.passcode
passcode
Definition: sendEI_SPB_Lib.py:26
readCCLHist.float
float
Definition: readCCLHist.py:83
Trk::split
@ split
Definition: LayerMaterialProperties.h:38