ATLAS Offline Software
Public Member Functions | Private Member Functions | Private Attributes | List of all members
python.trfGraph.executorGraph Class Reference

Simple graph object describing the links between executors. More...

Inheritance diagram for python.trfGraph.executorGraph:
Collaboration diagram for python.trfGraph.executorGraph:

Public Member Functions

def __init__ (self, executorSet, inputData=set([]), outputData=set([]))
 Initialise executor graph. More...
 
def inputData (self)
 
def inputData (self, inputData)
 
def outputData (self)
 
def outputData (self, outputData)
 
def execution (self)
 Return a list of execution nodes with their data inputs/outputs. More...
 
def data (self)
 Return a list of all data used in this execution. More...
 
def addNode (self, executor)
 Add an executor node to the graph. More...
 
def deleteNote (self, executor)
 Remove an executor node from the graph. More...
 
def findConnections (self)
 Look at executor nodes and work out how they are connected. More...
 
def doToposort (self)
 Find a topologically sorted list of the graph nodes. More...
 
def findExecutionPath (self)
 Find the graph's execution nodes, from input to output data types with each activated step and the inputs/outputs. More...
 
def __str__ (self)
 Nodes in topologically sorted order, if available, else sorted name order. More...
 
def __repr__ (self)
 Nodes in topologically sorted order, if available, else sorted name order. More...
 

Private Member Functions

def _resetConnections (self)
 
def _bestPath (self, data, dataAvailable, startNodeName='_start', endNodeName=None)
 Find the best path from a end to a start node, producing a certain type of data given the set of currently available data and the current set of activated nodes. More...
 
def _extendPath (self, path, currentNodeName, nextNodeName)
 Connect a path to a particular node. More...
 

Private Attributes

 _nodeDict
 
 _inputData
 
 _outputData
 
 _toposort
 
 _toposortData
 
 _execution
 

Detailed Description

Simple graph object describing the links between executors.

Definition at line 42 of file trfGraph.py.

Constructor & Destructor Documentation

◆ __init__()

def python.trfGraph.executorGraph.__init__ (   self,
  executorSet,
  inputData = set([]),
  outputData = set([]) 
)

Initialise executor graph.

Parameters
executorSetSet of executor instances
inputDataIterable with input data for this transform's execution
outputDataIterable with output data for this transform's execution

Definition at line 48 of file trfGraph.py.

48  def __init__(self, executorSet, inputData = set([]), outputData = set([])):
49 
50  # Set basic node list
51  self._nodeDict = {}
52 
53  msg.info('Transform graph input data: {0}; output data {1}'.format(inputData, outputData))
54 
55  if len(executorSet) == 1:
56  # Single executor - in this case inData/outData is not mandatory, so we set them to the
57  # input/output data of the transform
58  executor = list(executorSet)[0]
59  if len(executor._inData) == 0 and len(executor._outData) == 0:
60  executor.inData = inputData
61  executor.outData = outputData
62 
63  for executor in executorSet:
64  self.addNode(executor)
65 
66  self._inputData = set(inputData)
67  self._outputData = set(outputData)
68 
69  # It's forbidden for a transform to consume and produce the same datatype
70  dataOverlap = self._inputData & self._outputData
71  if len(dataOverlap) > 0:
72  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
73  'Transform definition error, you cannot produce and consume the same datatypes in a transform. Duplicated input/output types {0}.'.format(' '.join(dataOverlap)))
74 
75  # Add a pseudo-start/stop nodes, from which input data flows and output data finally arrives
76  # This makes the graph 'concrete' for this job
77  # This is useful as then data edges all connect properly to a pair of nodes
78  # We add a node for every possible output as this enables topo sorting of the graph
79  # nodes for any intermediate data end nodes as well
80  pseudoNodes = dict()
81  pseudoNodes['_start'] = graphNode(name='_start', inData=[], outData=self._inputData, weight = 0)
82  for node in self._nodeDict.values():
83  for dataType in node.outputDataTypes:
84  endNodeName = '_end_{0}'.format(dataType)
85  pseudoNodes[endNodeName] = graphNode(name=endNodeName, inData=[dataType], outData=[], weight = 0)
86  self._nodeDict.update(pseudoNodes)
87 
88  # Toposort not yet done
89  self._toposort = []
90  self._toposortData = []
91 
92  # Now find connections between nodes
93  self.findConnections()
94 

Member Function Documentation

◆ __repr__()

def python.trfGraph.executorGraph.__repr__ (   self)

Nodes in topologically sorted order, if available, else sorted name order.

Definition at line 416 of file trfGraph.py.

416  def __repr__(self):
417  nodeStrList = []
418  if len(self._toposort) > 0:
419  nodeNames = self._toposort
420  else:
421  nodeNames = list(self._nodeDict)
422  nodeNames.sort()
423  for nodeName in nodeNames:
424  nodeStrList.append(repr(self._nodeDict[nodeName]))
425  return os.linesep.join(nodeStrList)
426 
427 

◆ __str__()

def python.trfGraph.executorGraph.__str__ (   self)

Nodes in topologically sorted order, if available, else sorted name order.

Definition at line 402 of file trfGraph.py.

402  def __str__(self):
403  nodeStrList = []
404  if len(self._toposort) > 0:
405  nodeNames = self._toposort
406  else:
407  nodeNames = list(self._nodeDict)
408  nodeNames.sort()
409  for nodeName in nodeNames:
410  if not nodeName.startswith('_'):
411  nodeStrList.append(str(self._nodeDict[nodeName]))
412  return os.linesep.join(nodeStrList)
413 
414 

◆ _bestPath()

def python.trfGraph.executorGraph._bestPath (   self,
  data,
  dataAvailable,
  startNodeName = '_start',
  endNodeName = None 
)
private

Find the best path from a end to a start node, producing a certain type of data given the set of currently available data and the current set of activated nodes.

Parameters
dataData to produce
dataAvailableData types which can be used as sources
startNodeNameFind the path to this node (default '_start')
endNodeNameFind the path from this node (default '_end_DATATYPE')

We can always ask the algorithm to trace the part from end to start for this data type (this data is in endnode by construction). If we have to go along an edge where the data is not yet available then we need to add this data to our list of data to produce.

Definition at line 299 of file trfGraph.py.

299  def _bestPath(self, data, dataAvailable, startNodeName = '_start', endNodeName = None):
300 
301  if endNodeName is None:
302  endNodeName = '_end_{0}'.format(data)
303 
304  if endNodeName not in self._nodeDict:
305  raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
306  'Node {0} was not found - the transform data connection definition is broken'.format(endNodeName))
307 
308 
309  # Set of all considered paths
310  # Initialise this with our endNode name - algorithm works back to the start
311  pathSet = [graphPath(endNodeName, data),]
312 
313  msg.debug('Started path finding with seed path {0}'.format(pathSet[0]))
314 
315  # Halting condition - only one path and its first element is startNodeName
316  while len(pathSet) > 1 or pathSet[0].path[0] != startNodeName:
317  msg.debug('Starting best path iteration with {0} paths in {1}'.format(len(pathSet), pathSet))
318  # Copy the pathSet to do this, as we will update it
319  for path in pathSet[:]:
320  msg.debug('Continuing path finding with path {0}'.format(path))
321  currentNodeName = path.path[0]
322  if currentNodeName == startNodeName:
323  msg.debug('Path {0} has reached the start node - finished'.format(path))
324  continue
325  # If there are no paths out of this node then it's a dead end - kill it
326  if len(self._nodeDict[currentNodeName].connections['in']) == 0:
327  msg.debug('Path {0} is a dead end - removing'.format(path))
328  pathSet.remove(path)
329  continue
330  # If there is only one path out of this node, we extend it
331  if len(self._nodeDict[currentNodeName].connections['in']) == 1:
332  msg.debug('Single exit from path {0} - adding connection to {1}'.format(path, list(self._nodeDict[currentNodeName].connections['in'])[0]))
333  self._extendPath(path, currentNodeName, list(self._nodeDict[currentNodeName].connections['in'])[0])
334  continue
335  # Else we need to clone the path for each possible exit
336  msg.debug('Multiple exits from path {0} - will clone for each extra exit'.format([path]))
337  for nextNodeName in list(self._nodeDict[currentNodeName].connections['in'])[1:]:
338  newPath = copy.deepcopy(path)
339  msg.debug('Cloned exit from path {0} to {1}'.format(newPath, nextNodeName))
340  self._extendPath(newPath, currentNodeName, nextNodeName)
341  pathSet.append(newPath)
342  # Finally, use the original path to extend along the first node exit
343  msg.debug('Adding exit from original path {0} to {1}'.format(path, list(self._nodeDict[currentNodeName].connections['in'])[0]))
344  self._extendPath(path, currentNodeName, list(self._nodeDict[currentNodeName].connections['in'])[0])
345 
346  # Now compare paths which made it to the end - only keep the shortest
347  lowestCostPath = None
348  for path in pathSet[:]:
349  currentNodeName = path.path[0]
350  if currentNodeName == startNodeName:
351  if lowestCostPath is None:
352  lowestCostPath = path
353  continue
354  if path.cost >= lowestCostPath.cost:
355  msg.debug('Path {0} is no cheaper than best path {1} - removing'.format(path, lowestCostPath))
356  pathSet.remove(path)
357  else:
358  msg.debug('Path {0} is cheaper than previous best path {1} - removing previous'.format(path, lowestCostPath))
359  pathSet.remove(lowestCostPath)
360  lowestCostPath = path
361 
362  # Emergency break
363  if len(pathSet) == 0:
364  raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
365  'No path found between {0} and {1} for {2}'.format(startNodeName, endNodeName, data))
366  return pathSet[0]
367 
368 

◆ _extendPath()

def python.trfGraph.executorGraph._extendPath (   self,
  path,
  currentNodeName,
  nextNodeName 
)
private

Connect a path to a particular node.

Parameters
pathgraphPath instance
nextNodeNameNode to connect to

Definition at line 372 of file trfGraph.py.

372  def _extendPath(self, path, currentNodeName, nextNodeName):
373  edgeData = self._nodeDict[currentNodeName].connections['in'][nextNodeName]
374  msg.debug('Connecting {0} to {1} with data {2}'.format(currentNodeName, nextNodeName, edgeData))
375 
376  extraData = set()
377  if self._execution[currentNodeName]['enabled'] is True:
378  extraCost = 0
379  else:
380  for edgeDataElement in edgeData:
381  # Simple case - one data connection only
382  if edgeDataElement in self._nodeDict[currentNodeName].inData:
383  extraCost = self._nodeDict[currentNodeName].weights[edgeDataElement]
384  else:
385  # Complex case - the start requirement for this node must be multi-data
386  # Only the first match in the dataIn lists is considered
387  # This will break if there are multiple overlapping dataIn requirements
388  for nodeStartData in self._nodeDict[currentNodeName].inData:
389  if isinstance(nodeStartData, (list, tuple)) and edgeDataElement in nodeStartData:
390  extraCost = self._nodeDict[currentNodeName].weights[nodeStartData]
391  msg.debug('Found multi-data exit from {0} to {1} - adding {2} to data requirements'.format(currentNodeName, nextNodeName, nodeStartData))
392  extraData.update(nodeStartData)
393  break
394  # Remove data which is on the edge itself
395  extraData.difference_update(edgeData)
396 
397  msg.debug('Updating path {0} with {1}, {2}, {3}, {4}'.format(path, nextNodeName, edgeData, extraData, extraCost))
398  path.addToPath(nextNodeName, edgeData, extraData, extraCost)
399 
400 

◆ _resetConnections()

def python.trfGraph.executorGraph._resetConnections (   self)
private

Definition at line 148 of file trfGraph.py.

148  def _resetConnections(self):
149  for node in self._nodeDict.values():
150  node.resetConnections()
151 

◆ addNode()

def python.trfGraph.executorGraph.addNode (   self,
  executor 
)

Add an executor node to the graph.

Definition at line 138 of file trfGraph.py.

138  def addNode(self, executor):
139  self._nodeDict[executor.name] = executorNode(executor)
140 
141 

◆ data()

def python.trfGraph.executorGraph.data (   self)

Return a list of all data used in this execution.

Definition at line 126 of file trfGraph.py.

126  def data(self):
127  dataset = set()
128  for nodeName in self._toposort:
129  # Start and end nodes are not real - they never actually execute
130  if nodeName.startswith(('_start', '_end')):
131  continue
132  if self._execution[nodeName]['enabled'] is True:
133  dataset.update(self._execution[nodeName]['input'])
134  dataset.update(self._execution[nodeName]['output'])
135  return dataset
136 

◆ deleteNote()

def python.trfGraph.executorGraph.deleteNote (   self,
  executor 
)

Remove an executor node from the graph.

Definition at line 143 of file trfGraph.py.

143  def deleteNote(self, executor):
144  if executor.name in self._nodeDict:
145  del(self._nodeDict[executor.name])
146 
147 

◆ doToposort()

def python.trfGraph.executorGraph.doToposort (   self)

Find a topologically sorted list of the graph nodes.

Note
If this is not possible, the graph is not a DAG - not supported
See http://en.wikipedia.org/wiki/Topological_sorting

Definition at line 171 of file trfGraph.py.

171  def doToposort(self):
172  # We will manipulate the graph, so deepcopy it
173  graphCopy = copy.deepcopy(self._nodeDict)
174  # Find all valid start nodes in this graph - ones with no data dependencies themselves
175  startNodeNames = []
176  for nodeName, node in graphCopy.items():
177  if len(node.connections['in']) == 0:
178  startNodeNames.append(nodeName)
179 
180  if len(startNodeNames) == 0:
181  raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
182  'There are no starting nodes in this graph - non-DAG graphs are not supported')
183 
184  msg.debug('Found this list of start nodes for toposort: {0}'.format(startNodeNames))
185 
186  # The startNodeNames holds the list of nodes with their dependencies now satisfied (no input edges anymore)
187  while len(startNodeNames) > 0:
188  # Take the next startNodeName and zap it from the graph
189  theNodeName = startNodeNames.pop()
190  theNode = graphCopy[theNodeName]
191  self._toposort.append(theNodeName)
192  del graphCopy[theNodeName]
193 
194  # Now delete the edges this node was a source for
195  msg.debug('Considering connections from node {0}'.format(theNodeName))
196  for connectedNodeName in theNode.connections['out']:
197  graphCopy[connectedNodeName].delConnection(toExe = theNodeName, direction = 'in')
198  # Look for nodes which now have their dependencies satisfied
199  if len(graphCopy[connectedNodeName].connections['in']) == 0:
200  startNodeNames.append(connectedNodeName)
201 
202  # If there are nodes left then the graph has cycles, which means it's not a DAG
203  if len(graphCopy) > 0:
204  raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
205  'Graph topological sort had no more start nodes, but nodes were left {0} - non-DAG graphs are not supported'.format(list(graphCopy)))
206 
207  msg.debug('Topologically sorted node order: {0}'.format(self._toposort))
208 
209  # Now toposort the input data for nodes
210  self._toposortData = []
211  for nodeName in self._toposort:
212  # First add input data, then output data
213  for dataType in self._nodeDict[nodeName].inputDataTypes:
214  if dataType not in self._toposortData:
215  self._toposortData.append(dataType)
216  for dataType in self._nodeDict[nodeName].outputDataTypes:
217  if dataType not in self._toposortData:
218  self._toposortData.append(dataType)
219 
220  msg.debug('Topologically sorted data order: {0}'.format(self._toposortData))
221 
222 

◆ execution()

def python.trfGraph.executorGraph.execution (   self)

Return a list of execution nodes with their data inputs/outputs.

Definition at line 113 of file trfGraph.py.

113  def execution(self):
114  exeList = []
115  for nodeName in self._toposort:
116  # Start and end nodes are not real - they never actually execute
117  if nodeName.startswith(('_start', '_end')):
118  continue
119  if self._execution[nodeName]['enabled'] is True:
120  exeList.append({'name': nodeName, 'input': self._execution[nodeName]['input'],
121  'output': self._execution[nodeName]['output']})
122  return exeList
123 

◆ findConnections()

def python.trfGraph.executorGraph.findConnections (   self)

Look at executor nodes and work out how they are connected.

Note
Anything better than n^2? Should be ok for our low numbers of nodes, but could be optimised

Definition at line 154 of file trfGraph.py.

154  def findConnections(self):
155  self._resetConnections()
156  for nodeNameA, nodeA in self._nodeDict.items():
157  for nodeNameB, nodeB in self._nodeDict.items():
158  if nodeNameA == nodeNameB:
159  continue
160  dataIntersection = list(set(nodeA.outputDataTypes) & set(nodeB.inputDataTypes))
161  msg.debug('Data connections between {0} and {1}: {2}'.format(nodeNameA, nodeNameB, dataIntersection))
162  if len(dataIntersection) > 0:
163  nodeA.addConnection(nodeNameB, dataIntersection, direction='out')
164  nodeB.addConnection(nodeNameA, dataIntersection, direction='in')
165 
166  msg.debug('Graph connections are: \n{0}'.format(self))
167 

◆ findExecutionPath()

def python.trfGraph.executorGraph.findExecutionPath (   self)

Find the graph's execution nodes, from input to output data types with each activated step and the inputs/outputs.

Parameters

Definition at line 227 of file trfGraph.py.

227  def findExecutionPath(self):
228  # Switch off all nodes, except if we have a single node which is not data driven...
229  self._execution = {}
230  for nodeName, node in self._nodeDict.items():
231  if len(self._nodeDict) == 1 and node.inputDataTypes == set() and node.inputDataTypes == set():
232  self._execution[nodeName] = {'enabled' : True, 'input' : set(), 'output' : set()}
233  else:
234  self._execution[nodeName] = {'enabled' : False, 'input' : set(), 'output' : set()}
235 
236  dataToProduce = copy.deepcopy(self._outputData)
237  dataAvailable = copy.deepcopy(self._inputData)
238 
239  # Consider the next data type in topo order
240  while len(dataToProduce) > 0:
241  nextDataType = None
242  for dataType in self._toposortData:
243  if dataType in dataToProduce:
244  nextDataType = dataType
245  dataToProduce.remove(nextDataType)
246  dataAvailable.update([nextDataType])
247  break
248 
249  if not nextDataType:
250  msg.error('Still have to produce data type(s) {0}, but did not find anything in the toposorted data list ({1}).'
251  ' Transform parameters/graph are broken so aborting.'.format(dataToProduce, self._toposortData))
252  raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
253  'Data type graph error')
254 
255  msg.debug('Next data type to try is {0}'.format(nextDataType))
256  bestPath = self._bestPath(nextDataType, dataAvailable)
257 
258  msg.debug('Found best path for {0}: {1}'.format(nextDataType, bestPath))
259 
260 
261  modPath = bestPath.path + [None]
262  for (nodeName, nextNodeName) in [ (n, modPath[modPath.index(n)+1]) for n in bestPath.path ]:
263  self._execution[nodeName]['enabled'] = True
264  # Add the necessary data types to the output of the first node and the input of the next
265  if nodeName in bestPath.newData:
266  self._execution[nodeName]['output'].update(bestPath.newData[nodeName])
267  for newData in bestPath.newData[nodeName]:
268  if newData not in dataAvailable:
269  dataToProduce.update([newData])
270  if nextNodeName:
271  self._execution[nextNodeName]['input'].update(bestPath.newData[nodeName])
272  if nextNodeName in bestPath.extraData:
273  self._execution[nextNodeName]['input'].update(bestPath.extraData[nodeName])
274  # Add any extra data we need (from multi-exit nodes) to the data to produce list
275  for extraNodeData in bestPath.extraData.values():
276  for extra in extraNodeData:
277  if extra not in dataAvailable:
278  dataToProduce.update([extra])
279 
280  # Now remove the fake data objects from activated nodes
281  for node, props in self._execution.items():
282  msg.debug('Removing fake data from node {0}'.format(node))
283  props['input'] -= set(['inNULL', 'outNULL'])
284  props['output'] -= set(['inNULL', 'outNULL'])
285 
286  msg.debug('Execution dictionary: {0}'.format(self._execution))
287 
288 

◆ inputData() [1/2]

def python.trfGraph.executorGraph.inputData (   self)

Definition at line 96 of file trfGraph.py.

96  def inputData(self):
97  return self._inputData
98 

◆ inputData() [2/2]

def python.trfGraph.executorGraph.inputData (   self,
  inputData 
)

Definition at line 100 of file trfGraph.py.

100  def inputData(self, inputData):
101  self._inputData = set(inputData)
102 

◆ outputData() [1/2]

def python.trfGraph.executorGraph.outputData (   self)

Definition at line 104 of file trfGraph.py.

104  def outputData(self):
105  return self._outputData
106 

◆ outputData() [2/2]

def python.trfGraph.executorGraph.outputData (   self,
  outputData 
)

Definition at line 108 of file trfGraph.py.

108  def outputData(self, outputData):
109  self._outputData = set(outputData)
110 

Member Data Documentation

◆ _execution

python.trfGraph.executorGraph._execution
private

Definition at line 229 of file trfGraph.py.

◆ _inputData

python.trfGraph.executorGraph._inputData
private

Definition at line 66 of file trfGraph.py.

◆ _nodeDict

python.trfGraph.executorGraph._nodeDict
private

Definition at line 51 of file trfGraph.py.

◆ _outputData

python.trfGraph.executorGraph._outputData
private

Definition at line 67 of file trfGraph.py.

◆ _toposort

python.trfGraph.executorGraph._toposort
private

Definition at line 89 of file trfGraph.py.

◆ _toposortData

python.trfGraph.executorGraph._toposortData
private

Definition at line 90 of file trfGraph.py.


The documentation for this class was generated from the following file:
data
char data[hepevt_bytes_allocation_ATLAS]
Definition: HepEvt.cxx:11
vtune_athena.format
format
Definition: vtune_athena.py:14
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
HLTCFDot.findConnections
def findConnections(alg_list)
Definition: HLTCFDot.py:194
python.Bindings.values
values
Definition: Control/AthenaPython/python/Bindings.py:797
PyAthena::repr
std::string repr(PyObject *o)
returns the string representation of a python object equivalent of calling repr(o) in python
Definition: PyAthenaUtils.cxx:106
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:224
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
dqt_zlumi_pandas.update
update
Definition: dqt_zlumi_pandas.py:42
str
Definition: BTagTrackIpAccessor.cxx:11