ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfGraph.executorGraph Class Reference

Simple graph object describing the links between executors. More...

Inheritance diagram for python.trfGraph.executorGraph:
Collaboration diagram for python.trfGraph.executorGraph:

Public Types

typedef HLT::TypeInformation::for_each_type_c< typenameEDMLIST::map, my_functor, my_result<>, my_arg< HLT::TypeInformation::get_cont, CONTAINER > >::type result

Public Member Functions

 __init__ (self, executorSet, inputData=set([]), outputData=set([]))
 Initialise executor graph.
 inputData (self)
 inputData (self, inputData)
 outputData (self)
 outputData (self, outputData)
 execution (self)
 Return a list of execution nodes with their data inputs/outputs.
 data (self)
 Return a list of all data used in this execution.
 addNode (self, executor)
 Add an executor node to the graph.
 deleteNote (self, executor)
 Remove an executor node from the graph.
 findConnections (self)
 Look at executor nodes and work out how they are connected.
 doToposort (self)
 Find a topologically sorted list of the graph nodes.
 findExecutionPath (self)
 Find the graph's execution nodes, from input to output data types with each activated step and the inputs/outputs.
 __str__ (self)
 Nodes in topologically sorted order, if available, else sorted name order.
 __repr__ (self)
 Nodes in topologically sorted order, if available, else sorted name order.

Protected Member Functions

 _resetConnections (self)
 _bestPath (self, data, dataAvailable, startNodeName='_start', endNodeName=None)
 Find the best path from a end to a start node, producing a certain type of data given the set of currently available data and the current set of activated nodes.
 _extendPath (self, path, currentNodeName, nextNodeName)
 Connect a path to a particular node.

Protected Attributes

dict _nodeDict = {}
 _inputData = set(inputData)
 _outputData = set(outputData)
list _toposort = []
list _toposortData = []
dict _execution = {}

Detailed Description

Simple graph object describing the links between executors.

Definition at line 42 of file trfGraph.py.

Member Typedef Documentation

◆ result

Definition at line 90 of file EDM_MasterSearch.h.

Constructor & Destructor Documentation

◆ __init__()

python.trfGraph.executorGraph.__init__ ( self,
executorSet,
inputData = set([]),
outputData = set([]) )

Initialise executor graph.

Parameters
executorSetSet of executor instances
inputDataIterable with input data for this transform's execution
outputDataIterable with output data for this transform's execution

Definition at line 48 of file trfGraph.py.

48 def __init__(self, executorSet, inputData = set([]), outputData = set([])):
49
50 # Set basic node list
51 self._nodeDict = {}
52
53 msg.info('Transform graph input data: {0}; output data {1}'.format(inputData, outputData))
54
55 if len(executorSet) == 1:
56 # Single executor - in this case inData/outData is not mandatory, so we set them to the
57 # input/output data of the transform
58 executor = list(executorSet)[0]
59 if len(executor._inData) == 0 and len(executor._outData) == 0:
60 executor.inData = inputData
61 executor.outData = outputData
62
63 for executor in executorSet:
64 self.addNode(executor)
65
66 self._inputData = set(inputData)
67 self._outputData = set(outputData)
68
69 # It's forbidden for a transform to consume and produce the same datatype
70 dataOverlap = self._inputData & self._outputData
71 if len(dataOverlap) > 0:
72 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
73 'Transform definition error, you cannot produce and consume the same datatypes in a transform. Duplicated input/output types {0}.'.format(' '.join(dataOverlap)))
74
75 # Add a pseudo-start/stop nodes, from which input data flows and output data finally arrives
76 # This makes the graph 'concrete' for this job
77 # This is useful as then data edges all connect properly to a pair of nodes
78 # We add a node for every possible output as this enables topo sorting of the graph
79 # nodes for any intermediate data end nodes as well
80 pseudoNodes = dict()
81 pseudoNodes['_start'] = graphNode(name='_start', inData=[], outData=self._inputData, weight = 0)
82 for node in self._nodeDict.values():
83 for dataType in node.outputDataTypes:
84 endNodeName = '_end_{0}'.format(dataType)
85 pseudoNodes[endNodeName] = graphNode(name=endNodeName, inData=[dataType], outData=[], weight = 0)
86 self._nodeDict.update(pseudoNodes)
87
88 # Toposort not yet done
89 self._toposort = []
90 self._toposortData = []
91
92 # Now find connections between nodes
93 self.findConnections()
94
STL class.

Member Function Documentation

◆ __repr__()

python.trfGraph.executorGraph.__repr__ ( self)

Nodes in topologically sorted order, if available, else sorted name order.

Definition at line 416 of file trfGraph.py.

416 def __repr__(self):
417 nodeStrList = []
418 if len(self._toposort) > 0:
419 nodeNames = self._toposort
420 else:
421 nodeNames = list(self._nodeDict)
422 nodeNames.sort()
423 for nodeName in nodeNames:
424 nodeStrList.append(repr(self._nodeDict[nodeName]))
425 return os.linesep.join(nodeStrList)
426
427

◆ __str__()

python.trfGraph.executorGraph.__str__ ( self)

Nodes in topologically sorted order, if available, else sorted name order.

Definition at line 402 of file trfGraph.py.

402 def __str__(self):
403 nodeStrList = []
404 if len(self._toposort) > 0:
405 nodeNames = self._toposort
406 else:
407 nodeNames = list(self._nodeDict)
408 nodeNames.sort()
409 for nodeName in nodeNames:
410 if not nodeName.startswith('_'):
411 nodeStrList.append(str(self._nodeDict[nodeName]))
412 return os.linesep.join(nodeStrList)
413
414

◆ _bestPath()

python.trfGraph.executorGraph._bestPath ( self,
data,
dataAvailable,
startNodeName = '_start',
endNodeName = None )
protected

Find the best path from a end to a start node, producing a certain type of data given the set of currently available data and the current set of activated nodes.

Parameters
dataData to produce
dataAvailableData types which can be used as sources
startNodeNameFind the path to this node (default '_start')
endNodeNameFind the path from this node (default '_end_DATATYPE')

We can always ask the algorithm to trace the part from end to start for this data type (this data is in endnode by construction). If we have to go along an edge where the data is not yet available then we need to add this data to our list of data to produce.

Definition at line 299 of file trfGraph.py.

299 def _bestPath(self, data, dataAvailable, startNodeName = '_start', endNodeName = None):
300
301 if endNodeName is None:
302 endNodeName = '_end_{0}'.format(data)
303
304 if endNodeName not in self._nodeDict:
305 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
306 'Node {0} was not found - the transform data connection definition is broken'.format(endNodeName))
307
308
309 # Set of all considered paths
310 # Initialise this with our endNode name - algorithm works back to the start
311 pathSet = [graphPath(endNodeName, data),]
312
313 msg.debug('Started path finding with seed path {0}'.format(pathSet[0]))
314
315 # Halting condition - only one path and its first element is startNodeName
316 while len(pathSet) > 1 or pathSet[0].path[0] != startNodeName:
317 msg.debug('Starting best path iteration with {0} paths in {1}'.format(len(pathSet), pathSet))
318 # Copy the pathSet to do this, as we will update it
319 for path in pathSet[:]:
320 msg.debug('Continuing path finding with path {0}'.format(path))
321 currentNodeName = path.path[0]
322 if currentNodeName == startNodeName:
323 msg.debug('Path {0} has reached the start node - finished'.format(path))
324 continue
325 # If there are no paths out of this node then it's a dead end - kill it
326 if len(self._nodeDict[currentNodeName].connections['in']) == 0:
327 msg.debug('Path {0} is a dead end - removing'.format(path))
328 pathSet.remove(path)
329 continue
330 # If there is only one path out of this node, we extend it
331 if len(self._nodeDict[currentNodeName].connections['in']) == 1:
332 msg.debug('Single exit from path {0} - adding connection to {1}'.format(path, list(self._nodeDict[currentNodeName].connections['in'])[0]))
333 self._extendPath(path, currentNodeName, list(self._nodeDict[currentNodeName].connections['in'])[0])
334 continue
335 # Else we need to clone the path for each possible exit
336 msg.debug('Multiple exits from path {0} - will clone for each extra exit'.format([path]))
337 for nextNodeName in list(self._nodeDict[currentNodeName].connections['in'])[1:]:
338 newPath = copy.deepcopy(path)
339 msg.debug('Cloned exit from path {0} to {1}'.format(newPath, nextNodeName))
340 self._extendPath(newPath, currentNodeName, nextNodeName)
341 pathSet.append(newPath)
342 # Finally, use the original path to extend along the first node exit
343 msg.debug('Adding exit from original path {0} to {1}'.format(path, list(self._nodeDict[currentNodeName].connections['in'])[0]))
344 self._extendPath(path, currentNodeName, list(self._nodeDict[currentNodeName].connections['in'])[0])
345
346 # Now compare paths which made it to the end - only keep the shortest
347 lowestCostPath = None
348 for path in pathSet[:]:
349 currentNodeName = path.path[0]
350 if currentNodeName == startNodeName:
351 if lowestCostPath is None:
352 lowestCostPath = path
353 continue
354 if path.cost >= lowestCostPath.cost:
355 msg.debug('Path {0} is no cheaper than best path {1} - removing'.format(path, lowestCostPath))
356 pathSet.remove(path)
357 else:
358 msg.debug('Path {0} is cheaper than previous best path {1} - removing previous'.format(path, lowestCostPath))
359 pathSet.remove(lowestCostPath)
360 lowestCostPath = path
361
362 # Emergency break
363 if len(pathSet) == 0:
364 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
365 'No path found between {0} and {1} for {2}'.format(startNodeName, endNodeName, data))
366 return pathSet[0]
367
368

◆ _extendPath()

python.trfGraph.executorGraph._extendPath ( self,
path,
currentNodeName,
nextNodeName )
protected

Connect a path to a particular node.

Parameters
pathgraphPath instance
nextNodeNameNode to connect to

Definition at line 372 of file trfGraph.py.

372 def _extendPath(self, path, currentNodeName, nextNodeName):
373 edgeData = self._nodeDict[currentNodeName].connections['in'][nextNodeName]
374 msg.debug('Connecting {0} to {1} with data {2}'.format(currentNodeName, nextNodeName, edgeData))
375
376 extraData = set()
377 if self._execution[currentNodeName]['enabled'] is True:
378 extraCost = 0
379 else:
380 for edgeDataElement in edgeData:
381 # Simple case - one data connection only
382 if edgeDataElement in self._nodeDict[currentNodeName].inData:
383 extraCost = self._nodeDict[currentNodeName].weights[edgeDataElement]
384 else:
385 # Complex case - the start requirement for this node must be multi-data
386 # Only the first match in the dataIn lists is considered
387 # This will break if there are multiple overlapping dataIn requirements
388 for nodeStartData in self._nodeDict[currentNodeName].inData:
389 if isinstance(nodeStartData, (list, tuple)) and edgeDataElement in nodeStartData:
390 extraCost = self._nodeDict[currentNodeName].weights[nodeStartData]
391 msg.debug('Found multi-data exit from {0} to {1} - adding {2} to data requirements'.format(currentNodeName, nextNodeName, nodeStartData))
392 extraData.update(nodeStartData)
393 break
394 # Remove data which is on the edge itself
395 extraData.difference_update(edgeData)
396
397 msg.debug('Updating path {0} with {1}, {2}, {3}, {4}'.format(path, nextNodeName, edgeData, extraData, extraCost))
398 path.addToPath(nextNodeName, edgeData, extraData, extraCost)
399
400

◆ _resetConnections()

python.trfGraph.executorGraph._resetConnections ( self)
protected

Definition at line 148 of file trfGraph.py.

148 def _resetConnections(self):
149 for node in self._nodeDict.values():
150 node.resetConnections()
151

◆ addNode()

python.trfGraph.executorGraph.addNode ( self,
executor )

Add an executor node to the graph.

Definition at line 138 of file trfGraph.py.

138 def addNode(self, executor):
139 self._nodeDict[executor.name] = executorNode(executor)
140
141

◆ data()

python.trfGraph.executorGraph.data ( self)

Return a list of all data used in this execution.

Definition at line 126 of file trfGraph.py.

126 def data(self):
127 dataset = set()
128 for nodeName in self._toposort:
129 # Start and end nodes are not real - they never actually execute
130 if nodeName.startswith(('_start', '_end')):
131 continue
132 if self._execution[nodeName]['enabled'] is True:
133 dataset.update(self._execution[nodeName]['input'])
134 dataset.update(self._execution[nodeName]['output'])
135 return dataset
136
char data[hepevt_bytes_allocation_ATLAS]
Definition HepEvt.cxx:11

◆ deleteNote()

python.trfGraph.executorGraph.deleteNote ( self,
executor )

Remove an executor node from the graph.

Definition at line 143 of file trfGraph.py.

143 def deleteNote(self, executor):
144 if executor.name in self._nodeDict:
145 del(self._nodeDict[executor.name])
146
147

◆ doToposort()

python.trfGraph.executorGraph.doToposort ( self)

Find a topologically sorted list of the graph nodes.

Note
If this is not possible, the graph is not a DAG - not supported
See http://en.wikipedia.org/wiki/Topological_sorting

Definition at line 171 of file trfGraph.py.

171 def doToposort(self):
172 # We will manipulate the graph, so deepcopy it
173 graphCopy = copy.deepcopy(self._nodeDict)
174 # Find all valid start nodes in this graph - ones with no data dependencies themselves
175 startNodeNames = []
176 for nodeName, node in graphCopy.items():
177 if len(node.connections['in']) == 0:
178 startNodeNames.append(nodeName)
179
180 if len(startNodeNames) == 0:
181 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
182 'There are no starting nodes in this graph - non-DAG graphs are not supported')
183
184 msg.debug('Found this list of start nodes for toposort: {0}'.format(startNodeNames))
185
186 # The startNodeNames holds the list of nodes with their dependencies now satisfied (no input edges anymore)
187 while len(startNodeNames) > 0:
188 # Take the next startNodeName and zap it from the graph
189 theNodeName = startNodeNames.pop()
190 theNode = graphCopy[theNodeName]
191 self._toposort.append(theNodeName)
192 del graphCopy[theNodeName]
193
194 # Now delete the edges this node was a source for
195 msg.debug('Considering connections from node {0}'.format(theNodeName))
196 for connectedNodeName in theNode.connections['out']:
197 graphCopy[connectedNodeName].delConnection(toExe = theNodeName, direction = 'in')
198 # Look for nodes which now have their dependencies satisfied
199 if len(graphCopy[connectedNodeName].connections['in']) == 0:
200 startNodeNames.append(connectedNodeName)
201
202 # If there are nodes left then the graph has cycles, which means it's not a DAG
203 if len(graphCopy) > 0:
204 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
205 'Graph topological sort had no more start nodes, but nodes were left {0} - non-DAG graphs are not supported'.format(list(graphCopy)))
206
207 msg.debug('Topologically sorted node order: {0}'.format(self._toposort))
208
209 # Now toposort the input data for nodes
210 self._toposortData = []
211 for nodeName in self._toposort:
212 # First add input data, then output data
213 for dataType in self._nodeDict[nodeName].inputDataTypes:
214 if dataType not in self._toposortData:
215 self._toposortData.append(dataType)
216 for dataType in self._nodeDict[nodeName].outputDataTypes:
217 if dataType not in self._toposortData:
218 self._toposortData.append(dataType)
219
220 msg.debug('Topologically sorted data order: {0}'.format(self._toposortData))
221
222

◆ execution()

python.trfGraph.executorGraph.execution ( self)

Return a list of execution nodes with their data inputs/outputs.

Definition at line 113 of file trfGraph.py.

113 def execution(self):
114 exeList = []
115 for nodeName in self._toposort:
116 # Start and end nodes are not real - they never actually execute
117 if nodeName.startswith(('_start', '_end')):
118 continue
119 if self._execution[nodeName]['enabled'] is True:
120 exeList.append({'name': nodeName, 'input': self._execution[nodeName]['input'],
121 'output': self._execution[nodeName]['output']})
122 return exeList
123

◆ findConnections()

python.trfGraph.executorGraph.findConnections ( self)

Look at executor nodes and work out how they are connected.

Note
Anything better than n^2? Should be ok for our low numbers of nodes, but could be optimised

Definition at line 154 of file trfGraph.py.

154 def findConnections(self):
155 self._resetConnections()
156 for nodeNameA, nodeA in self._nodeDict.items():
157 for nodeNameB, nodeB in self._nodeDict.items():
158 if nodeNameA == nodeNameB:
159 continue
160 dataIntersection = list(set(nodeA.outputDataTypes) & set(nodeB.inputDataTypes))
161 msg.debug('Data connections between {0} and {1}: {2}'.format(nodeNameA, nodeNameB, dataIntersection))
162 if len(dataIntersection) > 0:
163 nodeA.addConnection(nodeNameB, dataIntersection, direction='out')
164 nodeB.addConnection(nodeNameA, dataIntersection, direction='in')
165
166 msg.debug('Graph connections are: \n{0}'.format(self))
167

◆ findExecutionPath()

python.trfGraph.executorGraph.findExecutionPath ( self)

Find the graph's execution nodes, from input to output data types with each activated step and the inputs/outputs.

Parameters

c outputDataTypes Data to produce

Parameters

c inputDataTypes Data available as inputs

Definition at line 227 of file trfGraph.py.

227 def findExecutionPath(self):
228 # Switch off all nodes, except if we have a single node which is not data driven...
229 self._execution = {}
230 for nodeName, node in self._nodeDict.items():
231 if len(self._nodeDict) == 1 and node.inputDataTypes == set() and node.inputDataTypes == set():
232 self._execution[nodeName] = {'enabled' : True, 'input' : set(), 'output' : set()}
233 else:
234 self._execution[nodeName] = {'enabled' : False, 'input' : set(), 'output' : set()}
235
236 dataToProduce = copy.deepcopy(self._outputData)
237 dataAvailable = copy.deepcopy(self._inputData)
238
239 # Consider the next data type in topo order
240 while len(dataToProduce) > 0:
241 nextDataType = None
242 for dataType in self._toposortData:
243 if dataType in dataToProduce:
244 nextDataType = dataType
245 dataToProduce.remove(nextDataType)
246 dataAvailable.update([nextDataType])
247 break
248
249 if not nextDataType:
250 msg.error('Still have to produce data type(s) {0}, but did not find anything in the toposorted data list ({1}).'
251 ' Transform parameters/graph are broken so aborting.'.format(dataToProduce, self._toposortData))
252 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
253 'Data type graph error')
254
255 msg.debug('Next data type to try is {0}'.format(nextDataType))
256 bestPath = self._bestPath(nextDataType, dataAvailable)
257
258 msg.debug('Found best path for {0}: {1}'.format(nextDataType, bestPath))
259
260
261 modPath = bestPath.path + [None]
262 for (nodeName, nextNodeName) in [ (n, modPath[modPath.index(n)+1]) for n in bestPath.path ]:
263 self._execution[nodeName]['enabled'] = True
264 # Add the necessary data types to the output of the first node and the input of the next
265 if nodeName in bestPath.newData:
266 self._execution[nodeName]['output'].update(bestPath.newData[nodeName])
267 for newData in bestPath.newData[nodeName]:
268 if newData not in dataAvailable:
269 dataToProduce.update([newData])
270 if nextNodeName:
271 self._execution[nextNodeName]['input'].update(bestPath.newData[nodeName])
272 if nextNodeName in bestPath.extraData:
273 self._execution[nextNodeName]['input'].update(bestPath.extraData[nodeName])
274 # Add any extra data we need (from multi-exit nodes) to the data to produce list
275 for extraNodeData in bestPath.extraData.values():
276 for extra in extraNodeData:
277 if extra not in dataAvailable:
278 dataToProduce.update([extra])
279
280 # Now remove the fake data objects from activated nodes
281 for node, props in self._execution.items():
282 msg.debug('Removing fake data from node {0}'.format(node))
283 props['input'] -= set(['inNULL', 'outNULL'])
284 props['output'] -= set(['inNULL', 'outNULL'])
285
286 msg.debug('Execution dictionary: {0}'.format(self._execution))
287
288

◆ inputData() [1/2]

python.trfGraph.executorGraph.inputData ( self)

Definition at line 96 of file trfGraph.py.

96 def inputData(self):
97 return self._inputData
98

◆ inputData() [2/2]

python.trfGraph.executorGraph.inputData ( self,
inputData )

Definition at line 100 of file trfGraph.py.

100 def inputData(self, inputData):
101 self._inputData = set(inputData)
102

◆ outputData() [1/2]

python.trfGraph.executorGraph.outputData ( self)

Definition at line 104 of file trfGraph.py.

104 def outputData(self):
105 return self._outputData
106

◆ outputData() [2/2]

python.trfGraph.executorGraph.outputData ( self,
outputData )

Definition at line 108 of file trfGraph.py.

108 def outputData(self, outputData):
109 self._outputData = set(outputData)
110

Member Data Documentation

◆ _execution

python.trfGraph.executorGraph._execution = {}
protected

Definition at line 229 of file trfGraph.py.

◆ _inputData

python.trfGraph.executorGraph._inputData = set(inputData)
protected

Definition at line 66 of file trfGraph.py.

◆ _nodeDict

python.trfGraph.executorGraph._nodeDict = {}
protected

Definition at line 51 of file trfGraph.py.

◆ _outputData

python.trfGraph.executorGraph._outputData = set(outputData)
protected

Definition at line 67 of file trfGraph.py.

◆ _toposort

python.trfGraph.executorGraph._toposort = []
protected

Definition at line 89 of file trfGraph.py.

◆ _toposortData

python.trfGraph.executorGraph._toposortData = []
protected

Definition at line 90 of file trfGraph.py.


The documentation for this class was generated from the following file: