ATLAS Offline Software
RunDependentMCTaskIterator.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
2 
3 
9 
10 import itertools
11 import random
12 from operator import itemgetter
13 
14 def getRunLumiInfoFragment(jobnumber,task,maxEvents,totalEvents,skipEvents,sequentialEventNumbers=False):
15  """Calculate the specific configuration of the current job in the digi
16  task. Try to make each fragment utilize the same amount of CPU and
17  Cache resources. Exploits the fact that the task when sorted by
18  mu value will have long finishing pieces near the beginning and
19  short pieces near the end. A more even solution is obtained by
20  pairing chunks of maxEvents/2 from the beginning and end of the
21  sorted task.
22  """
23  hiMaxEvents,loMaxEvents=0,0
24  if(maxEvents%2==0):
25  hiMaxEvents=loMaxEvents=int(maxEvents/2)
26  else:
27  hiMaxEvents=int((maxEvents-1)/2)
28  loMaxEvents=int((maxEvents+1)/2)
29  hi_mu_frag=[]
30  lo_mu_frag=[]
31  if hiMaxEvents > 0:
32  hi_mu_frag=getFragment(jobnumber,sorted(task,key=lambda job: job['mu'],reverse=True),hiMaxEvents)
33  if loMaxEvents > 0:
34  lo_mu_frag=getFragment(jobnumber,sorted(task,key=lambda job: job['mu']),loMaxEvents)
35 
36  fragment=sorted(sum([hi_mu_frag,lo_mu_frag],[]),key=lambda job: job['run'])
37  if sequentialEventNumbers:
38  return defineSequentialEventNumbers(jobnumber,fragment,totalEvents,skipEvents)
39  else:
40  return fragment
41 
42 def getRandomlySampledRunLumiInfoFragment(jobnumber,task,maxEvents,totalEvents,skipEvents,sequentialEventNumbers=False):
43  """Calculate the specific configuration of the current job in the digi
44  task. Sample the mu values randomly.
45  """
46  # init random generator
47  random.seed(jobnumber)
48 
49  # generate task inverse
50  lookup_table=taskLookupTable(task)
51  max_index = len(lookup_table) - 1
52 
53  # sample mu profile
54  new_frag = []
55  evt_nbr = jobnumber * totalEvents + skipEvents
56  for i in range(maxEvents):
57  evt_nbr += 1
58 
59  index = lookup_table[random.randint(0, max_index)]
60  t = task[index]
61 
62  item = {
63  'run': t['run'],
64  'lb': t['lb'],
65  'starttstamp': t['starttstamp'],
66  'evts': 1,
67  'mu': t['mu'],
68  }
69 
70  if 'force_new' in t:
71  item['force_new'] = t['force_new']
72 
73  if sequentialEventNumbers:
74  item['evt_nbr'] = evt_nbr
75 
76  new_frag.append(item)
77 
78  return sorted(new_frag, key=itemgetter('run', 'starttstamp'))
79 
80 def getFragment(jobnumber,task,maxEvents):
81  """ Calculate the specific configuration of the current job in the digi task.
82  """
83  try: tIter = findPlaceInTask(jobnumber,task,maxEvents)
84  except StopIteration:
85  raise IndexError('There are only %i jobs in this task (not %i).' % (len([1 for i in taskIterator(task,maxEvents)]) + 1,jobnumber + 1))
86  try: tIter.__next__()
87  except StopIteration:
88  pass
89  return tIter.donejob
90 #
91 def findPlaceInTask(jobnumber,task,maxEvents):
92  """ Get the 'i'th job in the task, where each job tries to do maxEvents events.
93  The 'force_new' flag in the LB list ends the task before that LB, ignoring maxEvents.
94  Returns a taskIterator. Can raise StopIteration, so you should nest in try.
95  """
96  jobnumber = max(int(jobnumber),0)
97  i, jobs = (0,taskIterator(task,maxEvents))
98  while True:
99  if (i == jobnumber): return jobs
100  i += 1
101  next(jobs)
102  #exit by exception
103 #
105  """iterator over a list of dicts (the 'task'). Each dict must contain 'evts', optionally 'force_new'.
106  """
107  def __init__(self,task,step):
108  """create the iterator from task (a list of dicts) and step (the max number of evts. per job)
109  the iterator """
110  self.step = step
111  self.taskit = itertools.cycle(task)
112  self.offset = 0
113  self.current = None
114  self.donejob = []
115  try:
116  if min(e['evts'] for e in task) < 0: #py2.4
117  raise ValueError("Cannot use empty task lists or negative N(events).")
118  except KeyError:
119  raise ValueError("Cannot use tasks that don't always define 'evts':", task)
120  if (step < 1): raise ValueError("Cannot use step size smaller than 1 in a taskIterator.")
121  return
122 
123  def __eq__(self, another):
124  return (self.current == another.current) and (self.step == another.step)
125 
126  def __iter__(self):
127  return self
128 
129  def __repr__(self):
130  return "offset=%i; row=%i,%i" % (self.offset,self.current.get('run',0),self.current.get('lb',0))
131 
132  def __next__(self):
133  self.donejob = []
134  if (self.current is None): self.current = next(self.taskit)
135  to_do = self.step
136  while True:
137  if (to_do == 0) : return self.offset, self.current
138  can_do = self.current['evts'] - self.offset
139  if ( can_do > to_do ) :
140  self.donejob.append( self.current.copy() )
141  self.donejob[-1].update({'evts':to_do})
142  self.offset += to_do
143  return self.offset, self.current
144  else:
145  to_do -= can_do
146  self.offset = 0
147  pass
148  self.donejob.append( self.current.copy() )
149  self.donejob[-1].update({'evts':can_do})
150  self.current = next(self.taskit)
151  if self.current.get('force_new',False): to_do = 0
152  raise StopIteration
153 #
154 
155 def taskLookupTable(task):
156  """ Generate task lookup table
157  """
158  table = []
159  for i, item in enumerate(task):
160  for k in range(item['evts']):
161  table.append(i)
162  return table
163 
164 def defineSequentialEventNumbers(jobnumber,fragment,totalEvents,skipEvents):
165  """ Calculate sequential event numbers for the defined getFragment.
166  """
167  new_frag = []
168  evt_nbr = jobnumber * totalEvents + skipEvents
169  for t in fragment:
170  for i in range(t['evts']):
171  evt_nbr += 1
172  new_frag.append({
173  'run': t['run'],
174  'lb': t['lb'],
175  'starttstamp': t['starttstamp'],
176  'evts': 1,
177  'evt_nbr': evt_nbr,
178  'mu': t['mu'],
179  })
180  return new_frag
python.RunDependentMCTaskIterator.taskIterator.offset
offset
Definition: RunDependentMCTaskIterator.py:112
python.RunDependentMCTaskIterator.findPlaceInTask
def findPlaceInTask(jobnumber, task, maxEvents)
Definition: RunDependentMCTaskIterator.py:91
python.RunDependentMCTaskIterator.taskLookupTable
def taskLookupTable(task)
Definition: RunDependentMCTaskIterator.py:155
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
max
constexpr double max()
Definition: ap_fixedTest.cxx:33
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.RunDependentMCTaskIterator.taskIterator.__iter__
def __iter__(self)
Definition: RunDependentMCTaskIterator.py:126
python.RunDependentMCTaskIterator.taskIterator.taskit
taskit
Definition: RunDependentMCTaskIterator.py:111
python.RunDependentMCTaskIterator.getFragment
def getFragment(jobnumber, task, maxEvents)
Definition: RunDependentMCTaskIterator.py:80
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.RunDependentMCTaskIterator.taskIterator.donejob
donejob
Definition: RunDependentMCTaskIterator.py:114
python.RunDependentMCTaskIterator.taskIterator.current
current
Definition: RunDependentMCTaskIterator.py:113
convertTimingResiduals.sum
sum
Definition: convertTimingResiduals.py:55
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
python.RunDependentMCTaskIterator.taskIterator.__next__
def __next__(self)
Definition: RunDependentMCTaskIterator.py:132
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.RunDependentMCTaskIterator.taskIterator.step
step
Definition: RunDependentMCTaskIterator.py:110
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename T::value_type > sorted(T begin, T end)
Helper function to create a sorted vector from an unsorted one.
python.RunDependentMCTaskIterator.taskIterator.__eq__
def __eq__(self, another)
Definition: RunDependentMCTaskIterator.py:123
python.RunDependentMCTaskIterator.taskIterator.__init__
def __init__(self, task, step)
Definition: RunDependentMCTaskIterator.py:107
python.RunDependentMCTaskIterator.defineSequentialEventNumbers
def defineSequentialEventNumbers(jobnumber, fragment, totalEvents, skipEvents)
Definition: RunDependentMCTaskIterator.py:164
python.RunDependentMCTaskIterator.getRandomlySampledRunLumiInfoFragment
def getRandomlySampledRunLumiInfoFragment(jobnumber, task, maxEvents, totalEvents, skipEvents, sequentialEventNumbers=False)
Definition: RunDependentMCTaskIterator.py:42
python.RunDependentMCTaskIterator.taskIterator.__repr__
def __repr__(self)
Definition: RunDependentMCTaskIterator.py:129
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
pickleTool.object
object
Definition: pickleTool.py:30
calibdata.copy
bool copy
Definition: calibdata.py:27
WriteBchToCool.update
update
Definition: WriteBchToCool.py:67
python.RunDependentMCTaskIterator.getRunLumiInfoFragment
def getRunLumiInfoFragment(jobnumber, task, maxEvents, totalEvents, skipEvents, sequentialEventNumbers=False)
Definition: RunDependentMCTaskIterator.py:14
python.RunDependentMCTaskIterator.taskIterator
Definition: RunDependentMCTaskIterator.py:104