32 msg = logging.getLogger(__name__)
 
   34 import PyJobTransforms.trfExceptions 
as trfExceptions
 
   48     def __init__(self, executorSet, inputData = set([]), outputData = 
set([])):
 
   53         msg.info(
'Transform graph input data: {0}; output data {1}'.
format(inputData, outputData))
 
   55         if len(executorSet) == 1:
 
   58             executor = 
list(executorSet)[0]
 
   59             if len(executor._inData) == 0 
and len(executor._outData) == 0:
 
   60                 executor.inData = inputData
 
   61                 executor.outData = outputData
 
   63         for executor 
in executorSet:
 
   71         if len(dataOverlap) > 0:
 
   73                                                         'Transform definition error, you cannot produce and consume the same datatypes in a transform. Duplicated input/output types {0}.'.
format(
' '.
join(dataOverlap)))
 
   81         pseudoNodes[
'_start'] = 
graphNode(name=
'_start', inData=[], outData=self.
_inputData, weight = 0)
 
   83             for dataType 
in node.outputDataTypes:
 
   84                 endNodeName = 
'_end_{0}'.
format(dataType)
 
   85                 pseudoNodes[endNodeName] = 
graphNode(name=endNodeName, inData=[dataType], outData=[], weight = 0)
 
  117             if nodeName.startswith((
'_start', 
'_end')):
 
  119             if self.
_execution[nodeName][
'enabled'] 
is True:
 
  120                 exeList.append({
'name': nodeName, 
'input': self.
_execution[nodeName][
'input'], 
 
  121                                 'output': self.
_execution[nodeName][
'output']})
 
  130             if nodeName.startswith((
'_start', 
'_end')):
 
  132             if self.
_execution[nodeName][
'enabled'] 
is True:
 
  133                 dataset.update(self.
_execution[nodeName][
'input'])
 
  134                 dataset.update(self.
_execution[nodeName][
'output'])
 
  150             node.resetConnections()
 
  158                 if nodeNameA == nodeNameB:
 
  160                 dataIntersection = 
list(
set(nodeA.outputDataTypes) & 
set(nodeB.inputDataTypes))
 
  161                 msg.debug(
'Data connections between {0} and {1}: {2}'.
format(nodeNameA, nodeNameB, dataIntersection))
 
  162                 if len(dataIntersection) > 0:
 
  163                     nodeA.addConnection(nodeNameB, dataIntersection, direction=
'out')
 
  164                     nodeB.addConnection(nodeNameA, dataIntersection, direction=
'in')
 
  166         msg.debug(
'Graph connections are: \n{0}'.
format(self))
 
  173         graphCopy = copy.deepcopy(self.
_nodeDict)
 
  176         for nodeName, node 
in graphCopy.items():
 
  177             if len(node.connections[
'in']) == 0:
 
  178                 startNodeNames.append(nodeName)
 
  180         if len(startNodeNames) == 0:
 
  182                                                         'There are no starting nodes in this graph - non-DAG graphs are not supported')
 
  184         msg.debug(
'Found this list of start nodes for toposort: {0}'.
format(startNodeNames))
 
  187         while len(startNodeNames) > 0:
 
  189             theNodeName = startNodeNames.pop()
 
  190             theNode = graphCopy[theNodeName]
 
  192             del graphCopy[theNodeName]
 
  195             msg.debug(
'Considering connections from node {0}'.
format(theNodeName))
 
  196             for connectedNodeName 
in theNode.connections[
'out']:
 
  197                 graphCopy[connectedNodeName].delConnection(toExe = theNodeName, direction = 
'in')
 
  199                 if len(graphCopy[connectedNodeName].connections[
'in']) == 0:
 
  200                     startNodeNames.append(connectedNodeName)
 
  203         if len(graphCopy) > 0:
 
  205                                                         'Graph topological sort had no more start nodes, but nodes were left {0} - non-DAG graphs are not supported'.
format(
list(graphCopy)))
 
  207         msg.debug(
'Topologically sorted node order: {0}'.
format(self.
_toposort))
 
  213             for dataType 
in self.
_nodeDict[nodeName].inputDataTypes:
 
  216             for dataType 
in self.
_nodeDict[nodeName].outputDataTypes:
 
  231             if len(self.
_nodeDict) == 1 
and node.inputDataTypes == 
set() 
and node.inputDataTypes == 
set():
 
  232                 self.
_execution[nodeName] = {
'enabled' : 
True, 
'input' : 
set(), 
'output' : 
set()}
 
  234                 self.
_execution[nodeName] = {
'enabled' : 
False, 
'input' : 
set(), 
'output' : 
set()}
 
  237         dataAvailable = copy.deepcopy(self.
_inputData)
 
  240         while len(dataToProduce) > 0:
 
  243                 if dataType 
in dataToProduce:
 
  244                     nextDataType = dataType
 
  245                     dataToProduce.remove(nextDataType)
 
  246                     dataAvailable.update([nextDataType])
 
  250                 msg.error(
'Still have to produce data type(s) {0}, but did not find anything in the toposorted data list ({1}).'  
  251                           ' Transform parameters/graph are broken so aborting.'.
format(dataToProduce, self.
_toposortData))
 
  253                                                             'Data type graph error')
 
  255             msg.debug(
'Next data type to try is {0}'.
format(nextDataType))
 
  256             bestPath = self.
_bestPath(nextDataType, dataAvailable)
 
  258             msg.debug(
'Found best path for {0}: {1}'.
format(nextDataType, bestPath))
 
  261             modPath = bestPath.path + [
None]
 
  262             for (nodeName, nextNodeName) 
in [ (n, modPath[modPath.index(n)+1]) 
for n 
in bestPath.path ]:
 
  265                 if nodeName 
in bestPath.newData:
 
  266                     self.
_execution[nodeName][
'output'].update(bestPath.newData[nodeName])
 
  267                     for newData 
in bestPath.newData[nodeName]:
 
  268                         if newData 
not in dataAvailable:
 
  269                             dataToProduce.update([newData])
 
  271                     self.
_execution[nextNodeName][
'input'].update(bestPath.newData[nodeName])
 
  272                     if nextNodeName 
in bestPath.extraData:
 
  273                         self.
_execution[nextNodeName][
'input'].update(bestPath.extraData[nodeName])
 
  275                 for extraNodeData 
in bestPath.extraData.values():
 
  276                     for extra 
in extraNodeData:
 
  277                         if extra 
not in dataAvailable:
 
  278                             dataToProduce.update([extra])
 
  282             msg.debug(
'Removing fake data from node {0}'.
format(node))
 
  283             props[
'input'] -= 
set([
'inNULL', 
'outNULL'])
 
  284             props[
'output'] -= 
set([
'inNULL', 
'outNULL'])
 
  299     def _bestPath(self, data, dataAvailable, startNodeName = '_start', endNodeName = None):
 
  301         if endNodeName 
is None:
 
  302             endNodeName = 
'_end_{0}'.
format(data)
 
  306                 'Node {0} was not found - the transform data connection definition is broken'.
format(endNodeName))
 
  311         pathSet = [
graphPath(endNodeName, data),]
 
  313         msg.debug(
'Started path finding with seed path {0}'.
format(pathSet[0]))
 
  316         while len(pathSet) > 1 
or pathSet[0].path[0] != startNodeName:
 
  317             msg.debug(
'Starting best path iteration with {0} paths in {1}'.
format(len(pathSet), pathSet))
 
  319             for path 
in pathSet[:]:
 
  320                 msg.debug(
'Continuing path finding with path {0}'.
format(path))
 
  321                 currentNodeName = path.path[0]
 
  322                 if currentNodeName == startNodeName:
 
  323                     msg.debug(
'Path {0} has reached the start node - finished'.
format(path))
 
  326                 if len(self.
_nodeDict[currentNodeName].connections[
'in']) == 0:
 
  327                     msg.debug(
'Path {0} is a dead end - removing'.
format(path))
 
  331                 if len(self.
_nodeDict[currentNodeName].connections[
'in']) == 1:
 
  332                     msg.debug(
'Single exit from path {0} - adding connection to {1}'.
format(path, 
list(self.
_nodeDict[currentNodeName].connections[
'in'])[0]))
 
  336                 msg.debug(
'Multiple exits from path {0} - will clone for each extra exit'.
format([path]))
 
  337                 for nextNodeName 
in list(self.
_nodeDict[currentNodeName].connections[
'in'])[1:]:
 
  338                     newPath = copy.deepcopy(path)
 
  339                     msg.debug(
'Cloned exit from path {0} to {1}'.
format(newPath, nextNodeName))             
 
  340                     self.
_extendPath(newPath, currentNodeName, nextNodeName)
 
  341                     pathSet.append(newPath)
 
  343                 msg.debug(
'Adding exit from original path {0} to {1}'.
format(path, 
list(self.
_nodeDict[currentNodeName].connections[
'in'])[0]))
 
  347             lowestCostPath = 
None 
  348             for path 
in pathSet[:]:
 
  349                 currentNodeName = path.path[0]
 
  350                 if currentNodeName == startNodeName:
 
  351                     if lowestCostPath 
is None:
 
  352                         lowestCostPath = path
 
  354                     if path.cost >= lowestCostPath.cost:
 
  355                         msg.debug(
'Path {0} is no cheaper than best path {1} - removing'.
format(path, lowestCostPath))
 
  358                         msg.debug(
'Path {0} is cheaper than previous best path {1} - removing previous'.
format(path, lowestCostPath))
 
  359                         pathSet.remove(lowestCostPath)
 
  360                         lowestCostPath = path
 
  363             if len(pathSet) == 0:
 
  365                                                             'No path found between {0} and {1} for {2}'.
format(startNodeName, endNodeName, data))
 
  373         edgeData = self.
_nodeDict[currentNodeName].connections[
'in'][nextNodeName]
 
  374         msg.debug(
'Connecting {0} to {1} with data {2}'.
format(currentNodeName, nextNodeName, edgeData))
 
  377         if self.
_execution[currentNodeName][
'enabled'] 
is True:
 
  380             for edgeDataElement 
in edgeData:
 
  382                 if edgeDataElement 
in self.
_nodeDict[currentNodeName].inData:
 
  383                     extraCost = self.
_nodeDict[currentNodeName].weights[edgeDataElement]
 
  388                     for nodeStartData 
in self.
_nodeDict[currentNodeName].inData:
 
  389                         if isinstance(nodeStartData, (list, tuple)) 
and edgeDataElement 
in nodeStartData:
 
  390                             extraCost = self.
_nodeDict[currentNodeName].weights[nodeStartData]
 
  391                             msg.debug(
'Found multi-data exit from {0} to {1} - adding {2} to data requirements'.
format(currentNodeName, nextNodeName, nodeStartData))
 
  392                             extraData.update(nodeStartData)
 
  395             extraData.difference_update(edgeData)
 
  397         msg.debug(
'Updating path {0} with {1}, {2}, {3}, {4}'.
format(path, nextNodeName, edgeData, extraData, extraCost))
 
  398         path.addToPath(nextNodeName, edgeData, extraData, extraCost)
 
  409         for nodeName 
in nodeNames:
 
  410             if not nodeName.startswith(
'_'): 
 
  412         return os.linesep.join(nodeStrList)
 
  423         for nodeName 
in nodeNames:
 
  425         return os.linesep.join(nodeStrList)
 
  439     def __init__(self, name, inData, outData, weight = None):
 
  450         elif isinstance(weight, int):
 
  515         for data 
in startSet:
 
  516             if isinstance(data, (list, tuple)):
 
  517                 flatData.update(data)
 
  519                 flatData.update([data])
 
  536     def __init__(self, executor = None, weight = None):
 
  537         super(executorNode, self).
__init__(executor.name, executor.inData, executor.outData, weight)
 
  574     def addToPath(self, newNodeName, newData = set(), extraData = 
set(), extraCost = 0):
 
  575         self.
_path.insert(0, newNodeName)
 
  576         self.
_newData[newNodeName] = newData
 
  577         self.
_cost += extraCost