ATLAS Offline Software
Loading...
Searching...
No Matches
general.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
3"""
4Utility module for things not specific to DQ
5"""
6
7
8from contextlib import contextmanager
9from logging import getLogger; log = getLogger("DQUtils.general")
10
11from time import time, mktime, strptime, strftime, gmtime
12from subprocess import getoutput
13
14def get_package_version(package_name, default_prefix="DataQuality"):
15 """
16 Returns package version as determined from "cmt show versions"
17 """
18 if "/" not in package_name and default_prefix:
19 package_name = "/".join([default_prefix, package_name])
20 output = getoutput("cmt show versions %s" % package_name).split("\n")
21 output = list(map(str.strip, output))
22 return " ".join(output[0].split()[1:])
23
24@contextmanager
25def kgrind_profile(filename="profile.kgrind"):
26 """
27 Generate kcachegrind profile information for a call.
28
29 Example use:
30
31 with kgrind_profile() as p:
32 p.run("main()")
33 """
34 from cProfile import Profile
35 p = Profile()
36 try:
37 yield p
38 finally:
39 from .ext.lsprofcalltree import KCacheGrind
40 with open(filename,"w") as fd:
41 KCacheGrind(p).output(fd)
42
43def daemonize(function, delay, error_args=None, **kwargs):
44 """
45 Run a daemon which executes `function` every `delay` seconds.
46 """
47 from time import sleep
48 try:
49 while True:
50 log.info("Sleeping for %i seconds..", delay)
51 sleep(delay)
52
53 log.info("Current time: %s", strftime("%d/%m/%Y %H:%M:%S"))
54
55 function(**kwargs)
56 except Exception:
57 if error_args is not None:
58 import sys
59 send_error_email(sys.exc_info(), **error_args)
60 raise
61
62def send_error_email(exception_info, from_, subject, body):
63 """
64 Send an error email containing `exception_info`
65 TODO: Doesn't contain complete information
66 """
67 import smtplib
68 import traceback
69
70 from email.mime.text import MIMEText
71 from getpass import getuser
72 from socket import gethostname
73
74 exceptionType, exceptionValue, exceptionTraceback = exception_info
75
76 user, host = getuser(), gethostname()
77
78 extra_info = ["Was running as user '%s' on machine '%s'" % (user, host),
79 #"Code located at: %s" % code
80 ]
81
82 msg = MIMEText("\n\n".join(body + extra_info +
83 traceback.format_tb(exceptionTraceback)))
84
85 msg['Subject'] = subject
86 msg['From'] = from_
87
88 to = [
89 #"Peter Waller <peter.waller@cern.ch>",
90 "Peter Onyisi <peter.onyisi@cern.ch>"
91 ]
92
93 #operations = ("Data Quality Operations "
94 # "<hn-atlas-data-quality-operations@cern.ch>")
95
96 msg['To'] = ", ".join(to)
97 #msg['Cc'] = operations; to.append(operations)
98
99 s = smtplib.SMTP()
100 s.connect()
101 s.sendmail(from_, to, msg.as_string())
102 s.quit()
103
104 log.error("Error email sent.")
105
106def all_equal(*inputs):
107 """
108 Returns true if all input arguments are equal (must be hashable)
109 """
110 return len(set(inputs)) == 1
111
112def all_permutations(input_list):
113 """
114 Generate all permutations of `input_list`
115 """
116 if len(input_list) <= 1:
117 yield input_list
118 else:
119 for perm in all_permutations(input_list[1:]):
120 for i in range(len(perm)+1):
121 yield perm[:i] + input_list[0:1] + perm[i:]
122
123# Taken from:
124# http://caolanmcmahon.com/flatten_for_python
126 result = _flatten(l, lambda x: x)
127 while type(result) is list and len(result) and callable(result[0]):
128 if result[1] != []:
129 yield result[1]
130 result = result[0]([])
131 yield result
132
133def _flatten(l, fn, val=[]):
134 if type(l) is not list:
135 return fn(l)
136 if len(l) == 0:
137 return fn(val)
138 return [lambda x: _flatten(l[0], lambda y: _flatten(l[1:],fn,y), x), val]
139
140@contextmanager
141def silence(log_level=None, logger=None):
142 """
143 Turn down the logging verbosity temporarily
144 """
145 if log_level is None:
146 log_level = log.WARNING
147
148 if logger is None:
149 logger = log.getLogger()
150
151 orig_level = logger.level
152 logger.setLevel(log_level)
153
154 try:
155 yield
156 finally:
157 logger.setLevel(orig_level)
158
159@contextmanager
160def timer(name):
161 "A context manager which spits out how long the block took to execute"
162 start = time()
163 try:
164 yield
165 finally:
166 end = time()
167 log.debug("Took %.2f to %s", end - start, name)
168
169def interleave(*args):
170 return [item for items in zip(*args) for item in items]
171
172def date_to_nanounix(date_string):
173 """
174 Returns number of nanoseconds between unix epoch and date in the form
175 'dd/mm/yyyy'
176 """
177 return int(mktime(strptime(date_string, "%d/%m/%Y")) * 1e9)
178
179def nanounix_to_date(nanounix):
180 """
181 Returns a string representation of `nanounix` nanoseconds
182 since the unix epoch
183 """
184 sub_second = "%03i" % ((nanounix % int(1e9)) / 1e6)
185 try:
186 return strftime("%Y%m%d:%H%M%S.", gmtime(nanounix / 1e9)) + sub_second
187 except ValueError:
188 return "[BadTime: %s]" % int(nanounix)
189
STL class.
STL class.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
all_permutations(input_list)
Definition general.py:112
get_package_version(package_name, default_prefix="DataQuality")
Definition general.py:14
silence(log_level=None, logger=None)
Definition general.py:141
all_equal(*inputs)
Definition general.py:106
_flatten(l, fn, val=[])
Definition general.py:133
date_to_nanounix(date_string)
Definition general.py:172
interleave(*args)
Definition general.py:169
daemonize(function, delay, error_args=None, **kwargs)
Definition general.py:43
nanounix_to_date(nanounix)
Definition general.py:179
kgrind_profile(filename="profile.kgrind")
Definition general.py:25
send_error_email(exception_info, from_, subject, body)
Definition general.py:62