001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    
029    import cascading.flow.FlowElement;
030    import cascading.pipe.Group;
031    import cascading.pipe.Splice;
032    import cascading.tap.Tap;
033    import org.jgrapht.GraphPath;
034    import org.jgrapht.Graphs;
035    import org.jgrapht.alg.KShortestPaths;
036    import org.jgrapht.graph.SimpleDirectedGraph;
037    
038    /**
039     *
040     */
041    public class ElementGraphs
042      {
043      /**
044       * Method getAllShortestPathsBetween ...
045       *
046       * @param graph
047       * @param from  of type FlowElement
048       * @param to    of type FlowElement
049       * @return List<GraphPath<FlowElement, Scope>>
050       */
051      public static List<GraphPath<FlowElement, Scope>> getAllShortestPathsBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, FlowElement to )
052        {
053        List<GraphPath<FlowElement, Scope>> paths = new KShortestPaths<FlowElement, Scope>( graph, from, Integer.MAX_VALUE ).getPaths( to );
054    
055        if( paths == null )
056          return new ArrayList<GraphPath<FlowElement, Scope>>();
057    
058        return paths;
059        }
060    
061      public static List<List<FlowElement>> asPathList( List<GraphPath<FlowElement, Scope>> paths )
062        {
063        List<List<FlowElement>> results = new LinkedList<List<FlowElement>>();
064    
065        if( paths == null )
066          return results;
067    
068        for( GraphPath<FlowElement, Scope> path : paths )
069          results.add( Graphs.getPathVertexList( path ) );
070    
071        return results;
072        }
073    
074      /**
075       * All paths that lead from to to without crossing a Tap/Group boundary
076       *
077       * @param graph
078       * @param from
079       * @param to
080       * @return of type List
081       */
082      public static List<GraphPath<FlowElement, Scope>> getAllDirectPathsBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, FlowElement to )
083        {
084        List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( graph, from, to );
085        List<GraphPath<FlowElement, Scope>> results = new ArrayList<GraphPath<FlowElement, Scope>>( paths );
086    
087        for( GraphPath<FlowElement, Scope> path : paths )
088          {
089          List<FlowElement> pathVertexList = Graphs.getPathVertexList( path );
090    
091          for( int i = 1; i < pathVertexList.size() - 1; i++ ) // skip the from and to, its a Tap or Group
092            {
093            FlowElement flowElement = pathVertexList.get( i );
094    
095            if( flowElement instanceof Tap || flowElement instanceof Group )
096              {
097              results.remove( path );
098              break;
099              }
100            }
101          }
102    
103        return results;
104        }
105    
106      public static int countTypesBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, Splice to, Class type )
107        {
108        List<GraphPath<FlowElement, Scope>> paths = getAllDirectPathsBetween( graph, from, to );
109    
110        int count = 0;
111    
112        for( GraphPath<FlowElement, Scope> path : paths )
113          {
114          if( hasIntermediateTap( path, from ) )
115            continue;
116    
117          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
118    
119          for( FlowElement flowElement : flowElements )
120            {
121            if( type.isInstance( flowElement ) && flowElement != to )
122              count++;
123            }
124          }
125    
126        return count;
127        }
128    
129      /**
130       * for every incoming stream to the splice, gets the count of paths.
131       * <p/>
132       * covers the case where a source may cross multiple joins to the current join and still land
133       * on the lhs or rhs.
134       *
135       * @param graph
136       * @param from
137       * @param to
138       * @return of type Map
139       */
140      public static Map<Integer, Integer> countOrderedDirectPathsBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, Splice to )
141        {
142        return countOrderedDirectPathsBetween( graph, from, to, false );
143        }
144    
145      public static Map<Integer, Integer> countOrderedDirectPathsBetween( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement from, Splice to, boolean skipTaps )
146        {
147        List<GraphPath<FlowElement, Scope>> paths = getAllDirectPathsBetween( graph, from, to );
148    
149        Map<Integer, Integer> results = new HashMap<Integer, Integer>();
150    
151        for( GraphPath<FlowElement, Scope> path : paths )
152          {
153          if( skipTaps && hasIntermediateTap( path, from ) )
154            continue;
155    
156          pathPositionInto( results, path, to );
157          }
158    
159        return results;
160        }
161    
162      public static boolean isBothAccumulatedAndStreamedPath( Map<Integer, Integer> pathCounts )
163        {
164        return pathCounts.size() > 1 && pathCounts.containsKey( 0 );
165        }
166    
167      public static boolean isOnlyStreamedPath( Map<Integer, Integer> pathCounts )
168        {
169        return pathCounts.size() == 1 && pathCounts.containsKey( 0 );
170        }
171    
172      public static boolean isOnlyAccumulatedPath( Map<Integer, Integer> pathCounts )
173        {
174        return pathCounts.size() >= 1 && !pathCounts.containsKey( 0 );
175        }
176    
177      public static int countPaths( Map<Integer, Integer> pathCounts )
178        {
179        int count = 0;
180    
181        for( Integer integer : pathCounts.values() )
182          count += integer;
183    
184        return count;
185        }
186    
187      private static boolean hasIntermediateTap( GraphPath<FlowElement, Scope> path, FlowElement from )
188        {
189        List<FlowElement> flowElements = Graphs.getPathVertexList( path );
190    
191        for( FlowElement flowElement : flowElements )
192          {
193          if( flowElement instanceof Tap && flowElement != from )
194            return true;
195          }
196    
197        return false;
198        }
199    
200      public static int pathPositionInto( GraphPath<FlowElement, Scope> path, Splice to )
201        {
202        List<FlowElement> flowElements = Graphs.getPathVertexList( path );
203        List<Scope> scopes = path.getEdgeList();
204        int index = flowElements.indexOf( to );
205    
206        return to.getPipePos().get( scopes.get( index - 1 ).getName() );
207        }
208    
209      private static Map<Integer, Integer> pathPositionInto( Map<Integer, Integer> results, GraphPath<FlowElement, Scope> path, Splice to )
210        {
211        List<Scope> scopes = path.getEdgeList();
212    
213        Scope lastScope = scopes.get( scopes.size() - 1 );
214    
215        Integer pos = to.getPipePos().get( lastScope.getName() );
216    
217        if( results.containsKey( pos ) )
218          results.put( pos, results.get( pos ) + 1 );
219        else
220          results.put( pos, 1 );
221    
222        return results;
223        }
224      }