32msg = logging.getLogger(__name__)
34import PyJobTransforms.trfExceptions
as trfExceptions
48 def __init__(self, executorSet, inputData = set([]), outputData =
set([])):
53 msg.info(
'Transform graph input data: {0}; output data {1}'.format(inputData, outputData))
55 if len(executorSet) == 1:
58 executor = list(executorSet)[0]
59 if len(executor._inData) == 0
and len(executor._outData) == 0:
60 executor.inData = inputData
61 executor.outData = outputData
63 for executor
in executorSet:
71 if len(dataOverlap) > 0:
73 'Transform definition error, you cannot produce and consume the same datatypes in a transform. Duplicated input/output types {0}.'.format(
' '.join(dataOverlap)))
81 pseudoNodes[
'_start'] =
graphNode(name=
'_start', inData=[], outData=self.
_inputData, weight = 0)
83 for dataType
in node.outputDataTypes:
84 endNodeName =
'_end_{0}'.format(dataType)
85 pseudoNodes[endNodeName] =
graphNode(name=endNodeName, inData=[dataType], outData=[], weight = 0)
117 if nodeName.startswith((
'_start',
'_end')):
119 if self.
_execution[nodeName][
'enabled']
is True:
120 exeList.append({
'name': nodeName,
'input': self.
_execution[nodeName][
'input'],
121 'output': self.
_execution[nodeName][
'output']})
130 if nodeName.startswith((
'_start',
'_end')):
132 if self.
_execution[nodeName][
'enabled']
is True:
133 dataset.update(self.
_execution[nodeName][
'input'])
134 dataset.update(self.
_execution[nodeName][
'output'])
150 node.resetConnections()
156 for nodeNameA, nodeA
in self.
_nodeDict.items():
157 for nodeNameB, nodeB
in self.
_nodeDict.items():
158 if nodeNameA == nodeNameB:
160 dataIntersection = list(
set(nodeA.outputDataTypes) &
set(nodeB.inputDataTypes))
161 msg.debug(
'Data connections between {0} and {1}: {2}'.format(nodeNameA, nodeNameB, dataIntersection))
162 if len(dataIntersection) > 0:
163 nodeA.addConnection(nodeNameB, dataIntersection, direction=
'out')
164 nodeB.addConnection(nodeNameA, dataIntersection, direction=
'in')
166 msg.debug(
'Graph connections are: \n{0}'.format(self))
173 graphCopy = copy.deepcopy(self.
_nodeDict)
176 for nodeName, node
in graphCopy.items():
177 if len(node.connections[
'in']) == 0:
178 startNodeNames.append(nodeName)
180 if len(startNodeNames) == 0:
182 'There are no starting nodes in this graph - non-DAG graphs are not supported')
184 msg.debug(
'Found this list of start nodes for toposort: {0}'.format(startNodeNames))
187 while len(startNodeNames) > 0:
189 theNodeName = startNodeNames.pop()
190 theNode = graphCopy[theNodeName]
192 del graphCopy[theNodeName]
195 msg.debug(
'Considering connections from node {0}'.format(theNodeName))
196 for connectedNodeName
in theNode.connections[
'out']:
197 graphCopy[connectedNodeName].delConnection(toExe = theNodeName, direction =
'in')
199 if len(graphCopy[connectedNodeName].connections[
'in']) == 0:
200 startNodeNames.append(connectedNodeName)
203 if len(graphCopy) > 0:
205 'Graph topological sort had no more start nodes, but nodes were left {0} - non-DAG graphs are not supported'.format(list(graphCopy)))
207 msg.debug(
'Topologically sorted node order: {0}'.format(self.
_toposort))
213 for dataType
in self.
_nodeDict[nodeName].inputDataTypes:
216 for dataType
in self.
_nodeDict[nodeName].outputDataTypes:
220 msg.debug(
'Topologically sorted data order: {0}'.format(self.
_toposortData))
230 for nodeName, node
in self.
_nodeDict.items():
231 if len(self.
_nodeDict) == 1
and node.inputDataTypes ==
set()
and node.inputDataTypes ==
set():
232 self.
_execution[nodeName] = {
'enabled' :
True,
'input' :
set(),
'output' :
set()}
234 self.
_execution[nodeName] = {
'enabled' :
False,
'input' :
set(),
'output' :
set()}
237 dataAvailable = copy.deepcopy(self.
_inputData)
240 while len(dataToProduce) > 0:
243 if dataType
in dataToProduce:
244 nextDataType = dataType
245 dataToProduce.remove(nextDataType)
246 dataAvailable.update([nextDataType])
250 msg.error(
'Still have to produce data type(s) {0}, but did not find anything in the toposorted data list ({1}).'
251 ' Transform parameters/graph are broken so aborting.'.format(dataToProduce, self.
_toposortData))
253 'Data type graph error')
255 msg.debug(
'Next data type to try is {0}'.format(nextDataType))
256 bestPath = self.
_bestPath(nextDataType, dataAvailable)
258 msg.debug(
'Found best path for {0}: {1}'.format(nextDataType, bestPath))
261 modPath = bestPath.path + [
None]
262 for (nodeName, nextNodeName)
in [ (n, modPath[modPath.index(n)+1])
for n
in bestPath.path ]:
265 if nodeName
in bestPath.newData:
266 self.
_execution[nodeName][
'output'].update(bestPath.newData[nodeName])
267 for newData
in bestPath.newData[nodeName]:
268 if newData
not in dataAvailable:
269 dataToProduce.update([newData])
271 self.
_execution[nextNodeName][
'input'].update(bestPath.newData[nodeName])
272 if nextNodeName
in bestPath.extraData:
273 self.
_execution[nextNodeName][
'input'].update(bestPath.extraData[nodeName])
275 for extraNodeData
in bestPath.extraData.values():
276 for extra
in extraNodeData:
277 if extra
not in dataAvailable:
278 dataToProduce.update([extra])
282 msg.debug(
'Removing fake data from node {0}'.format(node))
283 props[
'input'] -=
set([
'inNULL',
'outNULL'])
284 props[
'output'] -=
set([
'inNULL',
'outNULL'])
286 msg.debug(
'Execution dictionary: {0}'.format(self.
_execution))
299 def _bestPath(self, data, dataAvailable, startNodeName = '_start', endNodeName = None):
301 if endNodeName
is None:
302 endNodeName =
'_end_{0}'.format(data)
306 'Node {0} was not found - the transform data connection definition is broken'.format(endNodeName))
311 pathSet = [
graphPath(endNodeName, data),]
313 msg.debug(
'Started path finding with seed path {0}'.format(pathSet[0]))
316 while len(pathSet) > 1
or pathSet[0].path[0] != startNodeName:
317 msg.debug(
'Starting best path iteration with {0} paths in {1}'.format(len(pathSet), pathSet))
319 for path
in pathSet[:]:
320 msg.debug(
'Continuing path finding with path {0}'.format(path))
321 currentNodeName = path.path[0]
322 if currentNodeName == startNodeName:
323 msg.debug(
'Path {0} has reached the start node - finished'.format(path))
326 if len(self.
_nodeDict[currentNodeName].connections[
'in']) == 0:
327 msg.debug(
'Path {0} is a dead end - removing'.format(path))
331 if len(self.
_nodeDict[currentNodeName].connections[
'in']) == 1:
332 msg.debug(
'Single exit from path {0} - adding connection to {1}'.format(path, list(self.
_nodeDict[currentNodeName].connections[
'in'])[0]))
333 self.
_extendPath(path, currentNodeName, list(self.
_nodeDict[currentNodeName].connections[
'in'])[0])
336 msg.debug(
'Multiple exits from path {0} - will clone for each extra exit'.format([path]))
337 for nextNodeName
in list(self.
_nodeDict[currentNodeName].connections[
'in'])[1:]:
338 newPath = copy.deepcopy(path)
339 msg.debug(
'Cloned exit from path {0} to {1}'.format(newPath, nextNodeName))
340 self.
_extendPath(newPath, currentNodeName, nextNodeName)
341 pathSet.append(newPath)
343 msg.debug(
'Adding exit from original path {0} to {1}'.format(path, list(self.
_nodeDict[currentNodeName].connections[
'in'])[0]))
344 self.
_extendPath(path, currentNodeName, list(self.
_nodeDict[currentNodeName].connections[
'in'])[0])
347 lowestCostPath =
None
348 for path
in pathSet[:]:
349 currentNodeName = path.path[0]
350 if currentNodeName == startNodeName:
351 if lowestCostPath
is None:
352 lowestCostPath = path
354 if path.cost >= lowestCostPath.cost:
355 msg.debug(
'Path {0} is no cheaper than best path {1} - removing'.format(path, lowestCostPath))
358 msg.debug(
'Path {0} is cheaper than previous best path {1} - removing previous'.format(path, lowestCostPath))
359 pathSet.remove(lowestCostPath)
360 lowestCostPath = path
363 if len(pathSet) == 0:
365 'No path found between {0} and {1} for {2}'.format(startNodeName, endNodeName, data))
373 edgeData = self.
_nodeDict[currentNodeName].connections[
'in'][nextNodeName]
374 msg.debug(
'Connecting {0} to {1} with data {2}'.format(currentNodeName, nextNodeName, edgeData))
377 if self.
_execution[currentNodeName][
'enabled']
is True:
380 for edgeDataElement
in edgeData:
382 if edgeDataElement
in self.
_nodeDict[currentNodeName].inData:
383 extraCost = self.
_nodeDict[currentNodeName].weights[edgeDataElement]
388 for nodeStartData
in self.
_nodeDict[currentNodeName].inData:
389 if isinstance(nodeStartData, (list, tuple))
and edgeDataElement
in nodeStartData:
390 extraCost = self.
_nodeDict[currentNodeName].weights[nodeStartData]
391 msg.debug(
'Found multi-data exit from {0} to {1} - adding {2} to data requirements'.format(currentNodeName, nextNodeName, nodeStartData))
392 extraData.update(nodeStartData)
395 extraData.difference_update(edgeData)
397 msg.debug(
'Updating path {0} with {1}, {2}, {3}, {4}'.format(path, nextNodeName, edgeData, extraData, extraCost))
398 path.addToPath(nextNodeName, edgeData, extraData, extraCost)
409 for nodeName
in nodeNames:
410 if not nodeName.startswith(
'_'):
411 nodeStrList.append(str(self.
_nodeDict[nodeName]))
412 return os.linesep.join(nodeStrList)
423 for nodeName
in nodeNames:
424 nodeStrList.append(repr(self.
_nodeDict[nodeName]))
425 return os.linesep.join(nodeStrList)
439 def __init__(self, name, inData, outData, weight = None):
450 elif isinstance(weight, int):
515 for data
in startSet:
516 if isinstance(data, (list, tuple)):
517 flatData.update(data)
519 flatData.update([data])
536 def __init__(self, executor = None, weight = None):
537 super(executorNode, self).
__init__(executor.name, executor.inData, executor.outData, weight)
574 def addToPath(self, newNodeName, newData = set(), extraData =
set(), extraCost = 0):
575 self.
_path.insert(0, newNodeName)
576 self.
_newData[newNodeName] = newData
577 self.
_cost += extraCost
Simple graph object describing the links between executors.
__init__(self, executorSet, inputData=set([]), outputData=set([]))
Initialise executor graph.
_bestPath(self, data, dataAvailable, startNodeName='_start', endNodeName=None)
Find the best path from a end to a start node, producing a certain type of data given the set of curr...
doToposort(self)
Find a topologically sorted list of the graph nodes.
__str__(self)
Nodes in topologically sorted order, if available, else sorted name order.
_extendPath(self, path, currentNodeName, nextNodeName)
Connect a path to a particular node.
addNode(self, executor)
Add an executor node to the graph.
findConnections(self)
Look at executor nodes and work out how they are connected.
__repr__(self)
Nodes in topologically sorted order, if available, else sorted name order.
execution(self)
Return a list of execution nodes with their data inputs/outputs.
findExecutionPath(self)
Find the graph's execution nodes, from input to output data types with each activated step and the in...
deleteNote(self, executor)
Remove an executor node from the graph.
Initialise a graph node from an executor.
__init__(self, executor=None, weight=None)
executorNode constructor
_flattenSet(self, startSet)
Take a list and return all simple members plus the members of any list/tuples in the set (i....
delConnection(self, toExe, direction='out')
Delete a connection from this node.
addConnection(self, toExe, data, direction='out')
Add a new edge connection for this node.
__init__(self, name, inData, outData, weight=None)
Graph node constructor.
resetConnections(self)
Delete all connections.
Path object holding a list of nodes and data types which trace a single path through the graph.
addToPath(self, newNodeName, newData=set(), extraData=set(), extraCost=0)
__init__(self, endNodeName, data, cost=0)
graphPath constructor