32 msg = logging.getLogger(__name__)
34 import PyJobTransforms.trfExceptions
as trfExceptions
48 def __init__(self, executorSet, inputData = set([]), outputData =
set([])):
53 msg.info(
'Transform graph input data: {0}; output data {1}'.
format(inputData, outputData))
55 if len(executorSet) == 1:
58 executor =
list(executorSet)[0]
59 if len(executor._inData) == 0
and len(executor._outData) == 0:
60 executor.inData = inputData
61 executor.outData = outputData
63 for executor
in executorSet:
71 if len(dataOverlap) > 0:
73 'Transform definition error, you cannot produce and consume the same datatypes in a transform. Duplicated input/output types {0}.'.
format(
' '.
join(dataOverlap)))
81 pseudoNodes[
'_start'] =
graphNode(name=
'_start', inData=[], outData=self.
_inputData, weight = 0)
83 for dataType
in node.outputDataTypes:
84 endNodeName =
'_end_{0}'.
format(dataType)
85 pseudoNodes[endNodeName] =
graphNode(name=endNodeName, inData=[dataType], outData=[], weight = 0)
117 if nodeName.startswith((
'_start',
'_end')):
119 if self.
_execution[nodeName][
'enabled']
is True:
120 exeList.append({
'name': nodeName,
'input': self.
_execution[nodeName][
'input'],
121 'output': self.
_execution[nodeName][
'output']})
130 if nodeName.startswith((
'_start',
'_end')):
132 if self.
_execution[nodeName][
'enabled']
is True:
133 dataset.update(self.
_execution[nodeName][
'input'])
134 dataset.update(self.
_execution[nodeName][
'output'])
150 node.resetConnections()
158 if nodeNameA == nodeNameB:
160 dataIntersection =
list(
set(nodeA.outputDataTypes) &
set(nodeB.inputDataTypes))
161 msg.debug(
'Data connections between {0} and {1}: {2}'.
format(nodeNameA, nodeNameB, dataIntersection))
162 if len(dataIntersection) > 0:
163 nodeA.addConnection(nodeNameB, dataIntersection, direction=
'out')
164 nodeB.addConnection(nodeNameA, dataIntersection, direction=
'in')
166 msg.debug(
'Graph connections are: \n{0}'.
format(self))
173 graphCopy = copy.deepcopy(self.
_nodeDict)
176 for nodeName, node
in graphCopy.items():
177 if len(node.connections[
'in']) == 0:
178 startNodeNames.append(nodeName)
180 if len(startNodeNames) == 0:
182 'There are no starting nodes in this graph - non-DAG graphs are not supported')
184 msg.debug(
'Found this list of start nodes for toposort: {0}'.
format(startNodeNames))
187 while len(startNodeNames) > 0:
189 theNodeName = startNodeNames.pop()
190 theNode = graphCopy[theNodeName]
192 del graphCopy[theNodeName]
195 msg.debug(
'Considering connections from node {0}'.
format(theNodeName))
196 for connectedNodeName
in theNode.connections[
'out']:
197 graphCopy[connectedNodeName].delConnection(toExe = theNodeName, direction =
'in')
199 if len(graphCopy[connectedNodeName].connections[
'in']) == 0:
200 startNodeNames.append(connectedNodeName)
203 if len(graphCopy) > 0:
205 'Graph topological sort had no more start nodes, but nodes were left {0} - non-DAG graphs are not supported'.
format(
list(graphCopy)))
207 msg.debug(
'Topologically sorted node order: {0}'.
format(self.
_toposort))
213 for dataType
in self.
_nodeDict[nodeName].inputDataTypes:
216 for dataType
in self.
_nodeDict[nodeName].outputDataTypes:
231 if len(self.
_nodeDict) == 1
and node.inputDataTypes ==
set()
and node.inputDataTypes ==
set():
232 self.
_execution[nodeName] = {
'enabled' :
True,
'input' :
set(),
'output' :
set()}
234 self.
_execution[nodeName] = {
'enabled' :
False,
'input' :
set(),
'output' :
set()}
237 dataAvailable = copy.deepcopy(self.
_inputData)
240 while len(dataToProduce) > 0:
243 if dataType
in dataToProduce:
244 nextDataType = dataType
245 dataToProduce.remove(nextDataType)
246 dataAvailable.update([nextDataType])
250 msg.error(
'Still have to produce data type(s) {0}, but did not find anything in the toposorted data list ({1}).'
251 ' Transform parameters/graph are broken so aborting.'.
format(dataToProduce, self.
_toposortData))
253 'Data type graph error')
255 msg.debug(
'Next data type to try is {0}'.
format(nextDataType))
256 bestPath = self.
_bestPath(nextDataType, dataAvailable)
258 msg.debug(
'Found best path for {0}: {1}'.
format(nextDataType, bestPath))
261 modPath = bestPath.path + [
None]
262 for (nodeName, nextNodeName)
in [ (n, modPath[modPath.index(n)+1])
for n
in bestPath.path ]:
265 if nodeName
in bestPath.newData:
267 for newData
in bestPath.newData[nodeName]:
268 if newData
not in dataAvailable:
269 dataToProduce.update([newData])
272 if nextNodeName
in bestPath.extraData:
275 for extraNodeData
in bestPath.extraData.values():
276 for extra
in extraNodeData:
277 if extra
not in dataAvailable:
278 dataToProduce.update([extra])
282 msg.debug(
'Removing fake data from node {0}'.
format(node))
283 props[
'input'] -=
set([
'inNULL',
'outNULL'])
284 props[
'output'] -=
set([
'inNULL',
'outNULL'])
299 def _bestPath(self, data, dataAvailable, startNodeName = '_start', endNodeName = None):
301 if endNodeName
is None:
302 endNodeName =
'_end_{0}'.
format(data)
306 'Node {0} was not found - the transform data connection definition is broken'.
format(endNodeName))
311 pathSet = [
graphPath(endNodeName, data),]
313 msg.debug(
'Started path finding with seed path {0}'.
format(pathSet[0]))
316 while len(pathSet) > 1
or pathSet[0].path[0] != startNodeName:
317 msg.debug(
'Starting best path iteration with {0} paths in {1}'.
format(len(pathSet), pathSet))
319 for path
in pathSet[:]:
320 msg.debug(
'Continuing path finding with path {0}'.
format(path))
321 currentNodeName = path.path[0]
322 if currentNodeName == startNodeName:
323 msg.debug(
'Path {0} has reached the start node - finished'.
format(path))
326 if len(self.
_nodeDict[currentNodeName].connections[
'in']) == 0:
327 msg.debug(
'Path {0} is a dead end - removing'.
format(path))
331 if len(self.
_nodeDict[currentNodeName].connections[
'in']) == 1:
332 msg.debug(
'Single exit from path {0} - adding connection to {1}'.
format(path,
list(self.
_nodeDict[currentNodeName].connections[
'in'])[0]))
336 msg.debug(
'Multiple exits from path {0} - will clone for each extra exit'.
format([path]))
337 for nextNodeName
in list(self.
_nodeDict[currentNodeName].connections[
'in'])[1:]:
338 newPath = copy.deepcopy(path)
339 msg.debug(
'Cloned exit from path {0} to {1}'.
format(newPath, nextNodeName))
340 self.
_extendPath(newPath, currentNodeName, nextNodeName)
341 pathSet.append(newPath)
343 msg.debug(
'Adding exit from original path {0} to {1}'.
format(path,
list(self.
_nodeDict[currentNodeName].connections[
'in'])[0]))
347 lowestCostPath =
None
348 for path
in pathSet[:]:
349 currentNodeName = path.path[0]
350 if currentNodeName == startNodeName:
351 if lowestCostPath
is None:
352 lowestCostPath = path
354 if path.cost >= lowestCostPath.cost:
355 msg.debug(
'Path {0} is no cheaper than best path {1} - removing'.
format(path, lowestCostPath))
358 msg.debug(
'Path {0} is cheaper than previous best path {1} - removing previous'.
format(path, lowestCostPath))
359 pathSet.remove(lowestCostPath)
360 lowestCostPath = path
363 if len(pathSet) == 0:
365 'No path found between {0} and {1} for {2}'.
format(startNodeName, endNodeName, data))
373 edgeData = self.
_nodeDict[currentNodeName].connections[
'in'][nextNodeName]
374 msg.debug(
'Connecting {0} to {1} with data {2}'.
format(currentNodeName, nextNodeName, edgeData))
377 if self.
_execution[currentNodeName][
'enabled']
is True:
380 for edgeDataElement
in edgeData:
382 if edgeDataElement
in self.
_nodeDict[currentNodeName].inData:
383 extraCost = self.
_nodeDict[currentNodeName].weights[edgeDataElement]
388 for nodeStartData
in self.
_nodeDict[currentNodeName].inData:
389 if isinstance(nodeStartData, (list, tuple))
and edgeDataElement
in nodeStartData:
390 extraCost = self.
_nodeDict[currentNodeName].weights[nodeStartData]
391 msg.debug(
'Found multi-data exit from {0} to {1} - adding {2} to data requirements'.
format(currentNodeName, nextNodeName, nodeStartData))
392 extraData.update(nodeStartData)
395 extraData.difference_update(edgeData)
397 msg.debug(
'Updating path {0} with {1}, {2}, {3}, {4}'.
format(path, nextNodeName, edgeData, extraData, extraCost))
398 path.addToPath(nextNodeName, edgeData, extraData, extraCost)
409 for nodeName
in nodeNames:
410 if not nodeName.startswith(
'_'):
412 return os.linesep.join(nodeStrList)
423 for nodeName
in nodeNames:
425 return os.linesep.join(nodeStrList)
439 def __init__(self, name, inData, outData, weight = None):
450 elif isinstance(weight, int):
515 for data
in startSet:
516 if isinstance(data, (list, tuple)):
517 flatData.update(data)
519 flatData.update([data])
536 def __init__(self, executor = None, weight = None):
537 super(executorNode, self).
__init__(executor.name, executor.inData, executor.outData, weight)
574 def addToPath(self, newNodeName, newData = set(), extraData =
set(), extraCost = 0):
575 self.
_path.insert(0, newNodeName)
576 self.
_newData[newNodeName] = newData
577 self.
_cost += extraCost