ATLAS Offline Software
Loading...
Searching...
No Matches
trfGraph.py
Go to the documentation of this file.
1# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
2
26
27
28import copy
29import os
30
31import logging
32msg = logging.getLogger(__name__)
33
34import PyJobTransforms.trfExceptions as trfExceptions
35
36from PyJobTransforms.trfExitCodes import trfExit
37
38
39
40
41
43
44
48 def __init__(self, executorSet, inputData = set([]), outputData = set([])):
49
50 # Set basic node list
51 self._nodeDict = {}
52
53 msg.info('Transform graph input data: {0}; output data {1}'.format(inputData, outputData))
54
55 if len(executorSet) == 1:
56 # Single executor - in this case inData/outData is not mandatory, so we set them to the
57 # input/output data of the transform
58 executor = list(executorSet)[0]
59 if len(executor._inData) == 0 and len(executor._outData) == 0:
60 executor.inData = inputData
61 executor.outData = outputData
62
63 for executor in executorSet:
64 self.addNode(executor)
65
66 self._inputData = set(inputData)
67 self._outputData = set(outputData)
68
69 # It's forbidden for a transform to consume and produce the same datatype
70 dataOverlap = self._inputData & self._outputData
71 if len(dataOverlap) > 0:
72 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
73 'Transform definition error, you cannot produce and consume the same datatypes in a transform. Duplicated input/output types {0}.'.format(' '.join(dataOverlap)))
74
75 # Add a pseudo-start/stop nodes, from which input data flows and output data finally arrives
76 # This makes the graph 'concrete' for this job
77 # This is useful as then data edges all connect properly to a pair of nodes
78 # We add a node for every possible output as this enables topo sorting of the graph
79 # nodes for any intermediate data end nodes as well
80 pseudoNodes = dict()
81 pseudoNodes['_start'] = graphNode(name='_start', inData=[], outData=self._inputData, weight = 0)
82 for node in self._nodeDict.values():
83 for dataType in node.outputDataTypes:
84 endNodeName = '_end_{0}'.format(dataType)
85 pseudoNodes[endNodeName] = graphNode(name=endNodeName, inData=[dataType], outData=[], weight = 0)
86 self._nodeDict.update(pseudoNodes)
87
88 # Toposort not yet done
89 self._toposort = []
90 self._toposortData = []
91
92 # Now find connections between nodes
93 self.findConnections()
94
95 @property
96 def inputData(self):
97 return self._inputData
98
99 @inputData.setter
100 def inputData(self, inputData):
101 self._inputData = set(inputData)
102
103 @property
104 def outputData(self):
105 return self._outputData
106
107 @outputData.setter
108 def outputData(self, outputData):
109 self._outputData = set(outputData)
110
111
112 @property
113 def execution(self):
114 exeList = []
115 for nodeName in self._toposort:
116 # Start and end nodes are not real - they never actually execute
117 if nodeName.startswith(('_start', '_end')):
118 continue
119 if self._execution[nodeName]['enabled'] is True:
120 exeList.append({'name': nodeName, 'input': self._execution[nodeName]['input'],
121 'output': self._execution[nodeName]['output']})
122 return exeList
123
124
125 @property
126 def data(self):
127 dataset = set()
128 for nodeName in self._toposort:
129 # Start and end nodes are not real - they never actually execute
130 if nodeName.startswith(('_start', '_end')):
131 continue
132 if self._execution[nodeName]['enabled'] is True:
133 dataset.update(self._execution[nodeName]['input'])
134 dataset.update(self._execution[nodeName]['output'])
135 return dataset
136
137
138 def addNode(self, executor):
139 self._nodeDict[executor.name] = executorNode(executor)
140
141
142
143 def deleteNote(self, executor):
144 if executor.name in self._nodeDict:
145 del(self._nodeDict[executor.name])
146
147
149 for node in self._nodeDict.values():
150 node.resetConnections()
151
152
155 self._resetConnections()
156 for nodeNameA, nodeA in self._nodeDict.items():
157 for nodeNameB, nodeB in self._nodeDict.items():
158 if nodeNameA == nodeNameB:
159 continue
160 dataIntersection = list(set(nodeA.outputDataTypes) & set(nodeB.inputDataTypes))
161 msg.debug('Data connections between {0} and {1}: {2}'.format(nodeNameA, nodeNameB, dataIntersection))
162 if len(dataIntersection) > 0:
163 nodeA.addConnection(nodeNameB, dataIntersection, direction='out')
164 nodeB.addConnection(nodeNameA, dataIntersection, direction='in')
165
166 msg.debug('Graph connections are: \n{0}'.format(self))
167
168
171 def doToposort(self):
172 # We will manipulate the graph, so deepcopy it
173 graphCopy = copy.deepcopy(self._nodeDict)
174 # Find all valid start nodes in this graph - ones with no data dependencies themselves
175 startNodeNames = []
176 for nodeName, node in graphCopy.items():
177 if len(node.connections['in']) == 0:
178 startNodeNames.append(nodeName)
179
180 if len(startNodeNames) == 0:
181 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
182 'There are no starting nodes in this graph - non-DAG graphs are not supported')
183
184 msg.debug('Found this list of start nodes for toposort: {0}'.format(startNodeNames))
185
186 # The startNodeNames holds the list of nodes with their dependencies now satisfied (no input edges anymore)
187 while len(startNodeNames) > 0:
188 # Take the next startNodeName and zap it from the graph
189 theNodeName = startNodeNames.pop()
190 theNode = graphCopy[theNodeName]
191 self._toposort.append(theNodeName)
192 del graphCopy[theNodeName]
193
194 # Now delete the edges this node was a source for
195 msg.debug('Considering connections from node {0}'.format(theNodeName))
196 for connectedNodeName in theNode.connections['out']:
197 graphCopy[connectedNodeName].delConnection(toExe = theNodeName, direction = 'in')
198 # Look for nodes which now have their dependencies satisfied
199 if len(graphCopy[connectedNodeName].connections['in']) == 0:
200 startNodeNames.append(connectedNodeName)
201
202 # If there are nodes left then the graph has cycles, which means it's not a DAG
203 if len(graphCopy) > 0:
204 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
205 'Graph topological sort had no more start nodes, but nodes were left {0} - non-DAG graphs are not supported'.format(list(graphCopy)))
206
207 msg.debug('Topologically sorted node order: {0}'.format(self._toposort))
208
209 # Now toposort the input data for nodes
210 self._toposortData = []
211 for nodeName in self._toposort:
212 # First add input data, then output data
213 for dataType in self._nodeDict[nodeName].inputDataTypes:
214 if dataType not in self._toposortData:
215 self._toposortData.append(dataType)
216 for dataType in self._nodeDict[nodeName].outputDataTypes:
217 if dataType not in self._toposortData:
218 self._toposortData.append(dataType)
219
220 msg.debug('Topologically sorted data order: {0}'.format(self._toposortData))
221
222
223
228 # Switch off all nodes, except if we have a single node which is not data driven...
229 self._execution = {}
230 for nodeName, node in self._nodeDict.items():
231 if len(self._nodeDict) == 1 and node.inputDataTypes == set() and node.inputDataTypes == set():
232 self._execution[nodeName] = {'enabled' : True, 'input' : set(), 'output' : set()}
233 else:
234 self._execution[nodeName] = {'enabled' : False, 'input' : set(), 'output' : set()}
235
236 dataToProduce = copy.deepcopy(self._outputData)
237 dataAvailable = copy.deepcopy(self._inputData)
238
239 # Consider the next data type in topo order
240 while len(dataToProduce) > 0:
241 nextDataType = None
242 for dataType in self._toposortData:
243 if dataType in dataToProduce:
244 nextDataType = dataType
245 dataToProduce.remove(nextDataType)
246 dataAvailable.update([nextDataType])
247 break
248
249 if not nextDataType:
250 msg.error('Still have to produce data type(s) {0}, but did not find anything in the toposorted data list ({1}).'
251 ' Transform parameters/graph are broken so aborting.'.format(dataToProduce, self._toposortData))
252 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
253 'Data type graph error')
254
255 msg.debug('Next data type to try is {0}'.format(nextDataType))
256 bestPath = self._bestPath(nextDataType, dataAvailable)
257
258 msg.debug('Found best path for {0}: {1}'.format(nextDataType, bestPath))
259
260
261 modPath = bestPath.path + [None]
262 for (nodeName, nextNodeName) in [ (n, modPath[modPath.index(n)+1]) for n in bestPath.path ]:
263 self._execution[nodeName]['enabled'] = True
264 # Add the necessary data types to the output of the first node and the input of the next
265 if nodeName in bestPath.newData:
266 self._execution[nodeName]['output'].update(bestPath.newData[nodeName])
267 for newData in bestPath.newData[nodeName]:
268 if newData not in dataAvailable:
269 dataToProduce.update([newData])
270 if nextNodeName:
271 self._execution[nextNodeName]['input'].update(bestPath.newData[nodeName])
272 if nextNodeName in bestPath.extraData:
273 self._execution[nextNodeName]['input'].update(bestPath.extraData[nodeName])
274 # Add any extra data we need (from multi-exit nodes) to the data to produce list
275 for extraNodeData in bestPath.extraData.values():
276 for extra in extraNodeData:
277 if extra not in dataAvailable:
278 dataToProduce.update([extra])
279
280 # Now remove the fake data objects from activated nodes
281 for node, props in self._execution.items():
282 msg.debug('Removing fake data from node {0}'.format(node))
283 props['input'] -= set(['inNULL', 'outNULL'])
284 props['output'] -= set(['inNULL', 'outNULL'])
285
286 msg.debug('Execution dictionary: {0}'.format(self._execution))
287
288
289
299 def _bestPath(self, data, dataAvailable, startNodeName = '_start', endNodeName = None):
300
301 if endNodeName is None:
302 endNodeName = '_end_{0}'.format(data)
303
304 if endNodeName not in self._nodeDict:
305 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
306 'Node {0} was not found - the transform data connection definition is broken'.format(endNodeName))
307
308
309 # Set of all considered paths
310 # Initialise this with our endNode name - algorithm works back to the start
311 pathSet = [graphPath(endNodeName, data),]
312
313 msg.debug('Started path finding with seed path {0}'.format(pathSet[0]))
314
315 # Halting condition - only one path and its first element is startNodeName
316 while len(pathSet) > 1 or pathSet[0].path[0] != startNodeName:
317 msg.debug('Starting best path iteration with {0} paths in {1}'.format(len(pathSet), pathSet))
318 # Copy the pathSet to do this, as we will update it
319 for path in pathSet[:]:
320 msg.debug('Continuing path finding with path {0}'.format(path))
321 currentNodeName = path.path[0]
322 if currentNodeName == startNodeName:
323 msg.debug('Path {0} has reached the start node - finished'.format(path))
324 continue
325 # If there are no paths out of this node then it's a dead end - kill it
326 if len(self._nodeDict[currentNodeName].connections['in']) == 0:
327 msg.debug('Path {0} is a dead end - removing'.format(path))
328 pathSet.remove(path)
329 continue
330 # If there is only one path out of this node, we extend it
331 if len(self._nodeDict[currentNodeName].connections['in']) == 1:
332 msg.debug('Single exit from path {0} - adding connection to {1}'.format(path, list(self._nodeDict[currentNodeName].connections['in'])[0]))
333 self._extendPath(path, currentNodeName, list(self._nodeDict[currentNodeName].connections['in'])[0])
334 continue
335 # Else we need to clone the path for each possible exit
336 msg.debug('Multiple exits from path {0} - will clone for each extra exit'.format([path]))
337 for nextNodeName in list(self._nodeDict[currentNodeName].connections['in'])[1:]:
338 newPath = copy.deepcopy(path)
339 msg.debug('Cloned exit from path {0} to {1}'.format(newPath, nextNodeName))
340 self._extendPath(newPath, currentNodeName, nextNodeName)
341 pathSet.append(newPath)
342 # Finally, use the original path to extend along the first node exit
343 msg.debug('Adding exit from original path {0} to {1}'.format(path, list(self._nodeDict[currentNodeName].connections['in'])[0]))
344 self._extendPath(path, currentNodeName, list(self._nodeDict[currentNodeName].connections['in'])[0])
345
346 # Now compare paths which made it to the end - only keep the shortest
347 lowestCostPath = None
348 for path in pathSet[:]:
349 currentNodeName = path.path[0]
350 if currentNodeName == startNodeName:
351 if lowestCostPath is None:
352 lowestCostPath = path
353 continue
354 if path.cost >= lowestCostPath.cost:
355 msg.debug('Path {0} is no cheaper than best path {1} - removing'.format(path, lowestCostPath))
356 pathSet.remove(path)
357 else:
358 msg.debug('Path {0} is cheaper than previous best path {1} - removing previous'.format(path, lowestCostPath))
359 pathSet.remove(lowestCostPath)
360 lowestCostPath = path
361
362 # Emergency break
363 if len(pathSet) == 0:
364 raise trfExceptions.TransformGraphException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
365 'No path found between {0} and {1} for {2}'.format(startNodeName, endNodeName, data))
366 return pathSet[0]
367
368
369
372 def _extendPath(self, path, currentNodeName, nextNodeName):
373 edgeData = self._nodeDict[currentNodeName].connections['in'][nextNodeName]
374 msg.debug('Connecting {0} to {1} with data {2}'.format(currentNodeName, nextNodeName, edgeData))
375
376 extraData = set()
377 if self._execution[currentNodeName]['enabled'] is True:
378 extraCost = 0
379 else:
380 for edgeDataElement in edgeData:
381 # Simple case - one data connection only
382 if edgeDataElement in self._nodeDict[currentNodeName].inData:
383 extraCost = self._nodeDict[currentNodeName].weights[edgeDataElement]
384 else:
385 # Complex case - the start requirement for this node must be multi-data
386 # Only the first match in the dataIn lists is considered
387 # This will break if there are multiple overlapping dataIn requirements
388 for nodeStartData in self._nodeDict[currentNodeName].inData:
389 if isinstance(nodeStartData, (list, tuple)) and edgeDataElement in nodeStartData:
390 extraCost = self._nodeDict[currentNodeName].weights[nodeStartData]
391 msg.debug('Found multi-data exit from {0} to {1} - adding {2} to data requirements'.format(currentNodeName, nextNodeName, nodeStartData))
392 extraData.update(nodeStartData)
393 break
394 # Remove data which is on the edge itself
395 extraData.difference_update(edgeData)
396
397 msg.debug('Updating path {0} with {1}, {2}, {3}, {4}'.format(path, nextNodeName, edgeData, extraData, extraCost))
398 path.addToPath(nextNodeName, edgeData, extraData, extraCost)
399
400
401
402 def __str__(self):
403 nodeStrList = []
404 if len(self._toposort) > 0:
405 nodeNames = self._toposort
406 else:
407 nodeNames = list(self._nodeDict)
408 nodeNames.sort()
409 for nodeName in nodeNames:
410 if not nodeName.startswith('_'):
411 nodeStrList.append(str(self._nodeDict[nodeName]))
412 return os.linesep.join(nodeStrList)
413
414
415
416 def __repr__(self):
417 nodeStrList = []
418 if len(self._toposort) > 0:
419 nodeNames = self._toposort
420 else:
421 nodeNames = list(self._nodeDict)
422 nodeNames.sort()
423 for nodeName in nodeNames:
424 nodeStrList.append(repr(self._nodeDict[nodeName]))
425 return os.linesep.join(nodeStrList)
426
427
428
430
431
439 def __init__(self, name, inData, outData, weight = None):
440 self._name = name
441 self._inData = set(inData)
442 self._outData = set(outData)
443
444
446 self._inWeights = {}
447 if weight is None:
448 for data in self._inData:
449 self._inWeights[data] = 1
450 elif isinstance(weight, int):
451 for data in self._inData:
452 self._inWeights[data] = weight
453 else:
454 # Must be a dictionary with its keys equal to the _inData elements
455 self._inWeights = weight
456
459
460 # Connections dictionary will hold incoming and outgoing edges - the incoming connections
461 # are very useful for topological ordering. Nested dictionary with 'in', 'out' keys, where
462 # the values are dictionaries with nodeName keys and set(dataTypes) as values.
463 # e.g., {'out': {'_end_HIST': set(['HIST'])}, 'in': {'ESDtoAOD': set(['HIST_AOD']), 'RAWtoESD': set(['HIST_ESD'])}}
464 self._connections = {'in': {}, 'out': {}}
465
466 @property
467 def name(self):
468 return self._name
469
470 @property
471 def inData(self):
472 return self._inData
473
474 @property
475 def outData(self):
476 return self._outData
477
478 @property
479 def inputDataTypes(self):
480 return self._flattenSet(self.inData)
481
482 @property
484 return self._flattenSet(self._outData)
485
486 @property
487 def connections(self):
488 return self._connections
489
490 @property
491 def weights(self):
492 return self._inWeights
493
494
498 def addConnection(self, toExe, data, direction = 'out'):
499 self._connections[direction][toExe] = set(data)
500
501
504 def delConnection(self, toExe, direction = 'out'):
505 del self._connections[direction][toExe]
506
507
509 self._connections = {'in': {}, 'out': {}}
510
511
513 def _flattenSet(self, startSet):
514 flatData = set()
515 for data in startSet:
516 if isinstance(data, (list, tuple)):
517 flatData.update(data)
518 else:
519 flatData.update([data])
520 return flatData
521
522 def __str__(self):
523 return '{0} (dataIn {1} -> dataOut {2})'.format(self._name, self._inData, self._outData)
524
525 def __repr__(self):
526 return '{0} (dataIn {1}, weights {2}; dataOut {3}; connect {4})'.format(self._name, self._inData, self._inWeights, self._outData, self._connections)
527
528
529
531
536 def __init__(self, executor = None, weight = None):
537 super(executorNode, self).__init__(executor.name, executor.inData, executor.outData, weight)
538
539
541
542
546 def __init__(self, endNodeName, data, cost = 0):
547 self._path = [endNodeName]
548 self._data = data
549 self._cost = cost
550
551
555 self._newData = dict()
556 self._extraData = dict()
557
558 @property
559 def path(self):
560 return self._path
561
562 @property
563 def cost(self):
564 return self._cost
565
566 @property
567 def newData(self):
568 return self._newData
569
570 @property
571 def extraData(self):
572 return self._extraData
573
574 def addToPath(self, newNodeName, newData = set(), extraData = set(), extraCost = 0):
575 self._path.insert(0, newNodeName)
576 self._newData[newNodeName] = newData
577 self._cost += extraCost
578 self._extraData[newNodeName] = extraData
579
580 def addCost(self, cost):
581 self._cost += cost
582
583 def __str__(self):
584 return '{0}: path {1}; cost {2}, newData {3}, extraData {4}'.format(self._data, self._path, self._cost, self._newData, self._extraData)
585
Exception for problems finding the path through the graph.
Simple graph object describing the links between executors.
Definition trfGraph.py:42
__init__(self, executorSet, inputData=set([]), outputData=set([]))
Initialise executor graph.
Definition trfGraph.py:48
_bestPath(self, data, dataAvailable, startNodeName='_start', endNodeName=None)
Find the best path from a end to a start node, producing a certain type of data given the set of curr...
Definition trfGraph.py:299
doToposort(self)
Find a topologically sorted list of the graph nodes.
Definition trfGraph.py:171
__str__(self)
Nodes in topologically sorted order, if available, else sorted name order.
Definition trfGraph.py:402
_extendPath(self, path, currentNodeName, nextNodeName)
Connect a path to a particular node.
Definition trfGraph.py:372
addNode(self, executor)
Add an executor node to the graph.
Definition trfGraph.py:138
findConnections(self)
Look at executor nodes and work out how they are connected.
Definition trfGraph.py:154
__repr__(self)
Nodes in topologically sorted order, if available, else sorted name order.
Definition trfGraph.py:416
execution(self)
Return a list of execution nodes with their data inputs/outputs.
Definition trfGraph.py:113
findExecutionPath(self)
Find the graph's execution nodes, from input to output data types with each activated step and the in...
Definition trfGraph.py:227
deleteNote(self, executor)
Remove an executor node from the graph.
Definition trfGraph.py:143
Initialise a graph node from an executor.
Definition trfGraph.py:530
__init__(self, executor=None, weight=None)
executorNode constructor
Definition trfGraph.py:536
Vanilla graph node.
Definition trfGraph.py:429
_flattenSet(self, startSet)
Take a list and return all simple members plus the members of any list/tuples in the set (i....
Definition trfGraph.py:513
delConnection(self, toExe, direction='out')
Delete a connection from this node.
Definition trfGraph.py:504
addConnection(self, toExe, data, direction='out')
Add a new edge connection for this node.
Definition trfGraph.py:498
__init__(self, name, inData, outData, weight=None)
Graph node constructor.
Definition trfGraph.py:439
resetConnections(self)
Delete all connections.
Definition trfGraph.py:508
Path object holding a list of nodes and data types which trace a single path through the graph.
Definition trfGraph.py:540
addToPath(self, newNodeName, newData=set(), extraData=set(), extraCost=0)
Definition trfGraph.py:574
__init__(self, endNodeName, data, cost=0)
graphPath constructor
Definition trfGraph.py:546
STL class.
Module for transform exit codes.