001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.util.ArrayList; 024 import java.util.HashMap; 025 import java.util.LinkedList; 026 import java.util.List; 027 import java.util.Map; 028 029 import cascading.flow.FlowElement; 030 import cascading.pipe.Group; 031 import cascading.pipe.Splice; 032 import cascading.tap.Tap; 033 import org.jgrapht.GraphPath; 034 import org.jgrapht.Graphs; 035 import org.jgrapht.alg.KShortestPaths; 036 import org.jgrapht.graph.SimpleDirectedGraph; 037 038 /** 039 * 040 */ 041 public class ElementGraphs 042 { 043 /** 044 * Method getAllShortestPathsBetween ... 045 * 046 * @param graph 047 * @param from of type FlowElement 048 * @param to of type FlowElement 049 * @return List<GraphPath<FlowElement, Scope>> 050 */ 051 public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, FlowElement to ) 052 { 053 List<GraphPath<FlowElement, Scope>> paths = new KShortestPaths<FlowElement, Scope>( graph, from, Integer.MAX_VALUE ).getPaths( to ); 054 055 if( paths == null ) 056 return new ArrayList<GraphPath<FlowElement, Scope>>(); 057 058 return paths; 059 } 060 061 public static List<List<FlowElement>> asPathList( List<GraphPath<FlowElement, Scope>> paths ) 062 { 063 List<List<FlowElement>> results = new LinkedList<List<FlowElement>>(); 064 065 if( paths == null ) 066 return results; 067 068 for( GraphPath<FlowElement, Scope> path : paths ) 069 results.add( Graphs.getPathVertexList( path ) ); 070 071 return results; 072 } 073 074 /** 075 * All paths that lead from to to without crossing a Tap/Group boundary 076 * 077 * @param graph 078 * @param from 079 * @param to 080 * @return of type List 081 */ 082 public static List<GraphPath<FlowElement, Scope>> getAllDirectPathsBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, FlowElement to ) 083 { 084 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( graph, from, to ); 085 List<GraphPath<FlowElement, Scope>> results = new ArrayList<GraphPath<FlowElement, Scope>>( paths ); 086 087 for( GraphPath<FlowElement, Scope> path : paths ) 088 { 089 List<FlowElement> pathVertexList = Graphs.getPathVertexList( path ); 090 091 for( int i = 1; i < pathVertexList.size() - 1; i++ ) // skip the from and to, its a Tap or Group 092 { 093 FlowElement flowElement = pathVertexList.get( i ); 094 095 if( flowElement instanceof Tap || flowElement instanceof Group ) 096 { 097 results.remove( path ); 098 break; 099 } 100 } 101 } 102 103 return results; 104 } 105 106 public static int countTypesBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, Splice to, Class type ) 107 { 108 List<GraphPath<FlowElement, Scope>> paths = getAllDirectPathsBetween( graph, from, to ); 109 110 int count = 0; 111 112 for( GraphPath<FlowElement, Scope> path : paths ) 113 { 114 if( hasIntermediateTap( path, from ) ) 115 continue; 116 117 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 118 119 for( FlowElement flowElement : flowElements ) 120 { 121 if( type.isInstance( flowElement ) && flowElement != to ) 122 count++; 123 } 124 } 125 126 return count; 127 } 128 129 /** 130 * for every incoming stream to the splice, gets the count of paths. 131 * <p/> 132 * covers the case where a source may cross multiple joins to the current join and still land 133 * on the lhs or rhs. 134 * 135 * @param graph 136 * @param from 137 * @param to 138 * @return of type Map 139 */ 140 public static Map<Integer, Integer> countOrderedDirectPathsBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, Splice to ) 141 { 142 return countOrderedDirectPathsBetween( graph, from, to, false ); 143 } 144 145 public static Map<Integer, Integer> countOrderedDirectPathsBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, Splice to, boolean skipTaps ) 146 { 147 List<GraphPath<FlowElement, Scope>> paths = getAllDirectPathsBetween( graph, from, to ); 148 149 Map<Integer, Integer> results = new HashMap<Integer, Integer>(); 150 151 for( GraphPath<FlowElement, Scope> path : paths ) 152 { 153 if( skipTaps && hasIntermediateTap( path, from ) ) 154 continue; 155 156 pathPositionInto( results, path, to ); 157 } 158 159 return results; 160 } 161 162 public static boolean isBothAccumulatedAndStreamedPath( Map<Integer, Integer> pathCounts ) 163 { 164 return pathCounts.size() > 1 && pathCounts.containsKey( 0 ); 165 } 166 167 public static boolean isOnlyStreamedPath( Map<Integer, Integer> pathCounts ) 168 { 169 return pathCounts.size() == 1 && pathCounts.containsKey( 0 ); 170 } 171 172 public static boolean isOnlyAccumulatedPath( Map<Integer, Integer> pathCounts ) 173 { 174 return pathCounts.size() >= 1 && !pathCounts.containsKey( 0 ); 175 } 176 177 public static int countPaths( Map<Integer, Integer> pathCounts ) 178 { 179 int count = 0; 180 181 for( Integer integer : pathCounts.values() ) 182 count += integer; 183 184 return count; 185 } 186 187 private static boolean hasIntermediateTap( GraphPath<FlowElement, Scope> path, FlowElement from ) 188 { 189 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 190 191 for( FlowElement flowElement : flowElements ) 192 { 193 if( flowElement instanceof Tap && flowElement != from ) 194 return true; 195 } 196 197 return false; 198 } 199 200 public static int pathPositionInto( GraphPath<FlowElement, Scope> path, Splice to ) 201 { 202 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 203 List<Scope> scopes = path.getEdgeList(); 204 int index = flowElements.indexOf( to ); 205 206 return to.getPipePos().get( scopes.get( index - 1 ).getName() ); 207 } 208 209 private static Map<Integer, Integer> pathPositionInto( Map<Integer, Integer> results, GraphPath<FlowElement, Scope> path, Splice to ) 210 { 211 List<Scope> scopes = path.getEdgeList(); 212 213 Scope lastScope = scopes.get( scopes.size() - 1 ); 214 215 Integer pos = to.getPipePos().get( lastScope.getName() ); 216 217 if( results.containsKey( pos ) ) 218 results.put( pos, results.get( pos ) + 1 ); 219 else 220 results.put( pos, 1 ); 221 222 return results; 223 } 224 }