ATLAS Offline Software
Loading...
Searching...
No Matches
RunDependentMCTaskIterator.py
Go to the documentation of this file.
1# Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
2
3
9
10import itertools
11import random
12from operator import itemgetter
13
14def getRunLumiInfoFragment(jobnumber,task,maxEvents,totalEvents,skipEvents,sequentialEventNumbers=False):
15 """Calculate the specific configuration of the current job in the digi
16 task. Try to make each fragment utilize the same amount of CPU and
17 Cache resources. Exploits the fact that the task when sorted by
18 mu value will have long finishing pieces near the beginning and
19 short pieces near the end. A more even solution is obtained by
20 pairing chunks of maxEvents/2 from the beginning and end of the
21 sorted task.
22 """
23 hiMaxEvents,loMaxEvents=0,0
24 if(maxEvents%2==0):
25 hiMaxEvents=loMaxEvents=int(maxEvents/2)
26 else:
27 hiMaxEvents=int((maxEvents-1)/2)
28 loMaxEvents=int((maxEvents+1)/2)
29 hi_mu_frag=[]
30 lo_mu_frag=[]
31 if hiMaxEvents > 0:
32 hi_mu_frag=getFragment(jobnumber,sorted(task,key=lambda job: job['mu'],reverse=True),hiMaxEvents)
33 if loMaxEvents > 0:
34 lo_mu_frag=getFragment(jobnumber,sorted(task,key=lambda job: job['mu']),loMaxEvents)
35
36 fragment=sorted(sum([hi_mu_frag,lo_mu_frag],[]),key=lambda job: job['run'])
37 if sequentialEventNumbers:
38 return defineSequentialEventNumbers(jobnumber,fragment,totalEvents,skipEvents)
39 else:
40 return fragment
41
42def getRandomlySampledRunLumiInfoFragment(jobnumber,task,maxEvents,totalEvents,skipEvents,sequentialEventNumbers=False):
43 """Calculate the specific configuration of the current job in the digi
44 task. Sample the mu values randomly.
45 """
46 # init random generator
47 random.seed(jobnumber)
48
49 # generate task inverse
50 lookup_table=taskLookupTable(task)
51 max_index = len(lookup_table) - 1
52
53 # sample mu profile
54 new_frag = []
55 evt_nbr = jobnumber * totalEvents + skipEvents
56 for i in range(maxEvents):
57 evt_nbr += 1
58
59 index = lookup_table[random.randint(0, max_index)]
60 t = task[index]
61
62 item = {
63 'run': t['run'],
64 'lb': t['lb'],
65 'starttstamp': t['starttstamp'],
66 'evts': 1,
67 'mu': t['mu'],
68 }
69
70 if 'force_new' in t:
71 item['force_new'] = t['force_new']
72
73 if sequentialEventNumbers:
74 item['evt_nbr'] = evt_nbr
75
76 new_frag.append(item)
77
78 return sorted(new_frag, key=itemgetter('run', 'starttstamp'))
79
80def getFragment(jobnumber,task,maxEvents):
81 """ Calculate the specific configuration of the current job in the digi task.
82 """
83 try: tIter = findPlaceInTask(jobnumber,task,maxEvents)
84 except StopIteration:
85 raise IndexError('There are only %i jobs in this task (not %i).' % (len([1 for i in taskIterator(task,maxEvents)]) + 1,jobnumber + 1))
86 try: tIter.__next__()
87 except StopIteration:
88 pass
89 return tIter.donejob
90#
91def findPlaceInTask(jobnumber,task,maxEvents):
92 """ Get the 'i'th job in the task, where each job tries to do maxEvents events.
93 The 'force_new' flag in the LB list ends the task before that LB, ignoring maxEvents.
94 Returns a taskIterator. Can raise StopIteration, so you should nest in try.
95 """
96 jobnumber = max(int(jobnumber),0)
97 i, jobs = (0,taskIterator(task,maxEvents))
98 while True:
99 if (i == jobnumber): return jobs
100 i += 1
101 next(jobs)
102 #exit by exception
103#
105 """iterator over a list of dicts (the 'task'). Each dict must contain 'evts', optionally 'force_new'.
106 """
107 def __init__(self,task,step):
108 """create the iterator from task (a list of dicts) and step (the max number of evts. per job)
109 the iterator """
110 self.step = step
111 self.taskit = itertools.cycle(task)
112 self.offset = 0
113 self.current = None
114 self.donejob = []
115 try:
116 if min(e['evts'] for e in task) < 0: #py2.4
117 raise ValueError("Cannot use empty task lists or negative N(events).")
118 except KeyError:
119 raise ValueError("Cannot use tasks that don't always define 'evts':", task)
120 if (step < 1): raise ValueError("Cannot use step size smaller than 1 in a taskIterator.")
121 return
122
123 def __eq__(self, another):
124 return (self.current == another.current) and (self.step == another.step)
125
126 def __iter__(self):
127 return self
128
129 def __repr__(self):
130 return "offset=%i; row=%i,%i" % (self.offset,self.current.get('run',0),self.current.get('lb',0))
131
132 def __next__(self):
133 self.donejob = []
134 if (self.current is None): self.current = next(self.taskit)
135 to_do = self.step
136 while True:
137 if (to_do == 0) : return self.offset, self.current
138 can_do = self.current['evts'] - self.offset
139 if ( can_do > to_do ) :
140 self.donejob.append( self.current.copy() )
141 self.donejob[-1].update({'evts':to_do})
142 self.offset += to_do
143 return self.offset, self.current
144 else:
145 to_do -= can_do
146 self.offset = 0
147 pass
148 self.donejob.append( self.current.copy() )
149 self.donejob[-1].update({'evts':can_do})
150 self.current = next(self.taskit)
151 if self.current.get('force_new',False): to_do = 0
152 raise StopIteration
153#
154
156 """ Generate task lookup table
157 """
158 table = []
159 for i, item in enumerate(task):
160 for k in range(item['evts']):
161 table.append(i)
162 return table
163
164def defineSequentialEventNumbers(jobnumber,fragment,totalEvents,skipEvents):
165 """ Calculate sequential event numbers for the defined getFragment.
166 """
167 new_frag = []
168 evt_nbr = jobnumber * totalEvents + skipEvents
169 for t in fragment:
170 for i in range(t['evts']):
171 evt_nbr += 1
172 new_frag.append({
173 'run': t['run'],
174 'lb': t['lb'],
175 'starttstamp': t['starttstamp'],
176 'evts': 1,
177 'evt_nbr': evt_nbr,
178 'mu': t['mu'],
179 })
180 return new_frag
if(febId1==febId2)
#define min(a, b)
Definition cfImp.cxx:40
#define max(a, b)
Definition cfImp.cxx:41
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130
getRandomlySampledRunLumiInfoFragment(jobnumber, task, maxEvents, totalEvents, skipEvents, sequentialEventNumbers=False)
findPlaceInTask(jobnumber, task, maxEvents)
getRunLumiInfoFragment(jobnumber, task, maxEvents, totalEvents, skipEvents, sequentialEventNumbers=False)
defineSequentialEventNumbers(jobnumber, fragment, totalEvents, skipEvents)