001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.flow.FlowElement; 031import cascading.flow.FlowNode; 032import cascading.flow.planner.BaseFlowNode; 033import cascading.flow.planner.FlowPlanner; 034import cascading.flow.planner.graph.ElementGraph; 035import cascading.flow.planner.graph.FlowElementGraph; 036 037import static cascading.util.Util.createIdentitySet; 038 039/** 040 * 041 */ 042public class FlowNodeGraph extends BaseProcessGraph<FlowNode> 043 { 044 public static final FlowNodeComparator FLOW_NODE_COMPARATOR = new FlowNodeComparator(); 045 046 /** 047 * Class FlowNodeComparator provides a consistent tie breaker when ordering nodes topologically. 048 * <p/> 049 * This should have no effect on submission and execution priority as all FlowNodes are submitted simultaneously. 050 */ 051 public static class FlowNodeComparator implements Comparator<FlowNode> 052 { 053 @Override 054 public int compare( FlowNode lhs, FlowNode rhs ) 055 { 056 // larger graph first 057 int lhsSize = lhs.getElementGraph().vertexSet().size(); 058 int rhsSize = rhs.getElementGraph().vertexSet().size(); 059 int result = ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 ); 060 061 if( result != 0 ) 062 return result; 063 064 // more inputs second 065 lhsSize = lhs.getSourceElements().size(); 066 rhsSize = rhs.getSourceElements().size(); 067 068 return ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 ); 069 } 070 } 071 072 public FlowNodeGraph() 073 { 074 } 075 076 public FlowNodeGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs ) 077 { 078 this( flowPlanner, flowElementGraph, nodeSubGraphs, Collections.<ElementGraph, List<? extends ElementGraph>>emptyMap() ); 079 } 080 081 public FlowNodeGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 082 { 083 buildGraph( flowPlanner, flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap ); 084 085 // consistently sets ordinal of node based on topological dependencies and tie breaking by the given Comparator 086 Iterator<FlowNode> iterator = getOrderedTopologicalIterator(); 087 088 int ordinal = 0; 089 int size = vertexSet().size(); 090 091 while( iterator.hasNext() ) 092 { 093 BaseFlowNode next = (BaseFlowNode) iterator.next(); 094 095 next.setOrdinal( ordinal ); 096 next.setName( flowPlanner.makeFlowNodeName( next, size, ordinal ) ); 097 098 ordinal++; 099 } 100 } 101 102 protected void buildGraph( FlowPlanner<?, ?> flowPlanner, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 103 { 104 for( ElementGraph nodeSubGraph : nodeSubGraphs ) 105 { 106 List<? extends ElementGraph> pipelineGraphs = pipelineSubGraphsMap.get( nodeSubGraph ); 107 108 FlowNode flowNode = flowPlanner.createFlowNode( flowElementGraph, nodeSubGraph, pipelineGraphs ); 109 110 addVertex( flowNode ); 111 } 112 113 bindEdges(); 114 } 115 116 public Set<FlowElement> getFlowElementsFor( Enum annotation ) 117 { 118 Set<FlowElement> results = createIdentitySet(); 119 120 for( FlowNode flowNode : vertexSet() ) 121 results.addAll( flowNode.getFlowElementsFor( annotation ) ); 122 123 return results; 124 } 125 126 public Iterator<FlowNode> getOrderedTopologicalIterator() 127 { 128 return super.getOrderedTopologicalIterator( FLOW_NODE_COMPARATOR ); 129 } 130 }