ATLAS Offline Software
Loading...
Searching...
No Matches
DataQuality/DCSCalculator2/python/main.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
3
4from multiprocessing.pool import Pool
5
6import logging; log = logging.getLogger("DCSCalculator2.main")
7
8from DQUtils.db import fetch_iovs
9from DQUtils.iov_arrangement import inverse_lblb, run_iovs_from_lblb
10from DQUtils.general import timer
11from DQUtils.utils import pprint_objects
12from DQUtils.logger import init_logger
13from DQUtils.sugar import RunLumi, IOVSet
14
15from DQDefects import DefectsDB
16
17from DCSCalculator2.subdetectors import ALL_SYSTEMS, SYSTEM_MAP
18from DCSCalculator2.variable import DefectIOV, DefectIOVFull
19
20import DCSCalculator2.config as config
21
22from contextlib import nullcontext
23
24def maketime(n):
25 if n < 1000000:
26 # Probably an LB.
27 return n
28 seconds = n / 1e9
29 hours = seconds / 3600; seconds %= 3600
30 minutes = seconds / 60; seconds %= 60
31 return "%ih%im%is" % (hours, minutes, seconds)
32
34 if not result:
35 log.info("Empty Result.")
36 return
37 #print result
38
41
42
44
45def run_one(system, lbtime, run_iovs):
46 try:
47 with timer("Run DCS calculator 2 for %s" % system):
48 return system.run(lbtime, run_iovs)
49 except Exception:
50 log.warning("DCS Calculator failed to run for %s.", system)
51 if config.opts.dont_ignore_system_exceptions:
52 raise
53 log.exception("Continuing. Use -e -X commandline to investigate")
54 if config.opts.email_on_failure:
55 from DataQualityUtils.panic import panic
56 from traceback import format_exc
57 runnum = lbtime[0].Run if len(lbtime)>0 else '??????'
58 panicmsg = "DCS Calculator failed to execute in run %s for system %s.\n\n%s" % (runnum, system, format_exc())
59 panic(panicmsg)
60 return None
61 except (KeyboardInterrupt, SystemExit):
62 raise
63
64def run_parallel(systems, lbtime, run_iovs, N):
65 pool = Pool(N if N > 0 else len(systems))
66 results = []
67 for system in systems:
68 result = pool.apply_async(run_one, (system, lbtime, run_iovs))
69 results.append(result)
70
71 all_results = IOVSet()
72 for system, result in zip(systems, map(lambda x: x.get(), results)):
73 log.info(" -- result for %s", system)
74 print_system_iovs(result)
75 if result:
76 all_results.extend(result)
77
78 return all_results
79
80def run_sequential(systems, lbtime, run_iovs):
81 result_iovs = IOVSet()
82
83 for system in systems:
84 log.info(" -- running for %s", system)
85 system_result = run_one(system, lbtime, run_iovs)
86 print_system_iovs(system_result)
87 if system_result:
88 result_iovs.extend(system_result)
89
90 return result_iovs
91
92def go(iov, systems, db, indb, timewise=False, use_flask=False):
93 """
94 Run the DCS calculator for `run` on the `systems` specified, saving the
95 result to the `database`.
96 """
97 since, until = iov
98
99 with timer("Read LBLB"):
100 # fetch lumi block info
101 lblb = fetch_iovs("LBLB", since, until, with_channel=False, database=(indb if indb.startswith('sqlite') else 'COOLONL_TRIGGER/%s' % indb))
102 assert lblb, "No luminosity blocks for desired range. [%s, %s)" % (since, until)
103
104 # lumi block time info
105 lbtime = inverse_lblb(lblb)
106
107 # run_iovs is....?
108 if timewise:
109 run_iovs = run_iovs_from_lblb(lbtime)
110 else:
111 run_iovs = run_iovs_from_lblb(lblb)
112
113 log.debug("Will process %i LBs over %i runs", len(lblb), len(run_iovs))
114
115 log.info("DCSC2 will run over range: %r %s", tuple(run_iovs.range_iov), "(%i lbs)" % len(lblb))
116
117 # Initialize the system objects
118 # This changes 'systems' from a list of types to a list of subdetector objects
119 systems = [system() for system in systems
120 if not getattr(system, "__DISABLED__", False)]
121
122 # Run systems sequentially for this range
123 result_iovs = run_sequential(systems, lbtime, run_iovs)
124
125 # Run parallel is broken because of the use of closures to implement the
126 # defect translation.
127 #result_iovs = run_parallel(systems, lbtime, run_iovs, parallel)
128
129 if log.isEnabledFor(logging.DEBUG):
130 pprint_objects(result_iovs)
131
132 log.info("Calculation complete")
133
134 if db != "None":
135 try:
136 with timer("write result (%i iovs)" % len(result_iovs)):
137 log.debug("Writing result (%i iovs)", len(result_iovs))
138 defect_iovs = list(filter(lambda iov: isinstance(iov, DefectIOV), result_iovs)) # type: ignore
139 defect_iovs_full = [DefectIOVFull(recoverable=False, user='sys:defectcalculator', **_._asdict())
140 for _ in defect_iovs]
141 defect_iovs_full.append(DefectIOVFull(recoverable=False, user='sys:defectcalculator',
142 channel='GLOBAL_DCS_UNCHECKED', present=False,
143 comment='Calculator did run',
144 since=since, until=until))
145 if len(defect_iovs) > 0:
146 log.warning(f'db {db}, read_only {not use_flask}')
147 ddb = DefectsDB(db, read_only=use_flask, create=not use_flask)
148 defect_names = set(i.channel for i in defect_iovs_full)
149 for defect in defect_names:
150 if defect in ddb.defect_id_map:
151 continue
152 ddb.create_defect(defect, "Created by DCSCalculator2")
153 with ddb.storage_buffer if not use_flask else nullcontext():
154 if use_flask:
155 import os
156 import json
157 secret_path = os.environ.get('COOLFLASK_SECRET', '/afs/cern.ch/user/a/atlasdqm/private/coolflask_secret/coolflask_secret.json')
158 auth = json.loads(open(secret_path).read())
159 else:
160 auth = {}
161 ddb.insert_multiple(defect_iovs_full, use_flask=use_flask, flask_auth=auth)
162 except Exception:
163 log.warning("DCS Calculator failed to upload defects to DB")
164 if config.opts.email_on_failure:
165 from DataQualityUtils.panic import panic
166 from traceback import format_exc
167 runnum = lbtime[0].Run if len(lbtime)>0 else '??????'
168 panicmsg = "DCS Calculator failed to upload defects to database for run %s\n\n%s" % (runnum, format_exc())
169 panic(panicmsg)
170 raise
171
172
173 args = len(result_iovs), hash(result_iovs)
174 log.info("Success. Calculated %i iovs. Result hash: 0x%0x8.", *args)
175
176def main(argv):
177
178 optp, opts, args = config.parse_options(argv)
179
180 init_logger(opts.verbose)
181
182 log.debug("Commandline arguments: %s", argv)
183 log.debug("Current configuration: %s", (opts))
184
185 if opts.shell_on_exception:
186 import sys
187 from IPython.core import ultratb
188 sys.excepthook = ultratb.FormattedTB(call_pdb=True)
189 #from IPython.Shell import IPShellEmbed
190 #ipython_instance = IPShellEmbed(["-pdb"], rc_override=dict(quiet=True))
191
192 if opts.systems is None:
193 systems = ALL_SYSTEMS
194
195 else:
196 systems = []
197 invalid_systems = []
198 for system in opts.systems:
199 if system not in SYSTEM_MAP:
200 invalid_systems.append(system)
201 else:
202 systems.append(SYSTEM_MAP[system])
203
204 if invalid_systems:
205 optp.error("Invalid system(s) specified: {0}. "
206 "Available systems: {1}".format(invalid_systems, SYSTEM_MAP.keys()))
207
208
209 if (opts.run and opts.range) or (not opts.run and not opts.range):
210 optp.error("Specify either -r or -R")
211 elif opts.run:
212 since, until = opts.run, opts.run
213 elif opts.range:
214 since, until = map(int, opts.range.split("-"))
215
216 if opts.lbs:
217 if opts.range:
218 optp.error("--lbs and --range are incompatible. "
219 "Use --lbs with --run")
220 sincelb, untillb = map(int, opts.lbs.split("-"))
221 iov = RunLumi(since, sincelb), RunLumi(until, untillb)
222 else:
223 iov = RunLumi(since, 0), RunLumi(until+1, 0)
224
225 go(iov, systems, opts.output_database, opts.input_database, opts.timewise, opts.flask)
226
STL class.
STL class.
run_parallel(systems, lbtime, run_iovs, N)
run_one(system, lbtime, run_iovs)
run_sequential(systems, lbtime, run_iovs)
go(iov, systems, db, indb, timewise=False, use_flask=False)
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)