001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.cascade.planner;
022    
023    import java.util.Iterator;
024    import java.util.LinkedList;
025    import java.util.ListIterator;
026    
027    import cascading.cascade.CascadeException;
028    import cascading.flow.BaseFlow;
029    import cascading.flow.Flow;
030    import cascading.tap.CompositeTap;
031    import cascading.tap.Tap;
032    import org.jgrapht.graph.SimpleDirectedGraph;
033    
034    /**
035     *
036     */
037    public abstract class TopologyGraph<Vertex> extends SimpleDirectedGraph<Vertex, BaseFlow.FlowHolder>
038      {
039      public TopologyGraph( Flow... flows )
040        {
041        super( BaseFlow.FlowHolder.class );
042    
043        makeGraph( flows );
044        }
045    
046      private void makeGraph( Flow[] flows )
047        {
048        for( Flow flow : flows )
049          {
050          LinkedList<Tap> sources = new LinkedList<Tap>( flow.getSourcesCollection() );
051          LinkedList<Tap> sinks = new LinkedList<Tap>( flow.getSinksCollection() );
052    
053          sinks.addAll( flow.getCheckpointsCollection() );
054    
055          unwrapCompositeTaps( sources );
056          unwrapCompositeTaps( sinks );
057    
058          for( Tap source : sources )
059            addVertex( getVertex( flow, source ) );
060    
061          for( Tap sink : sinks )
062            addVertex( getVertex( flow, sink ) );
063    
064          for( Tap source : sources )
065            {
066            for( Tap sink : sinks )
067              addEdgeFor( flow, source, sink );
068            }
069          }
070        }
071    
072      private void addEdgeFor( Flow flow, Tap source, Tap sink )
073        {
074        try
075          {
076          addEdge( getVertex( flow, source ), getVertex( flow, sink ), ( (BaseFlow) flow ).getHolder() );
077          }
078        catch( IllegalArgumentException exception )
079          {
080          throw new CascadeException( "no loops allowed in cascade, flow: " + flow.getName() + ", source: " + source + ", sink: " + sink );
081          }
082        }
083    
084      abstract protected Vertex getVertex( Flow flow, Tap tap );
085    
086      private void unwrapCompositeTaps( LinkedList<Tap> taps )
087        {
088        ListIterator<Tap> iterator = taps.listIterator();
089    
090        while( iterator.hasNext() )
091          {
092          Tap tap = iterator.next();
093    
094          if( tap instanceof CompositeTap )
095            {
096            iterator.remove();
097    
098            Iterator<Tap> childTaps = ( (CompositeTap) tap ).getChildTaps();
099    
100            while( childTaps.hasNext() )
101              {
102              iterator.add( childTaps.next() );
103              iterator.previous(); // force cursor backward
104              }
105            }
106          }
107        }
108      }