001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.cascade.planner;
022
023import java.util.Comparator;
024import java.util.HashSet;
025import java.util.List;
026import java.util.PriorityQueue;
027import java.util.Set;
028
029import cascading.cascade.CascadeException;
030import cascading.flow.BaseFlow;
031import cascading.flow.Flow;
032import org.jgrapht.Graphs;
033import org.jgrapht.graph.SimpleDirectedGraph;
034import org.jgrapht.traverse.TopologicalOrderIterator;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 *
040 */
041public class FlowGraph extends SimpleDirectedGraph<Flow, Integer>
042  {
043  private static final Logger LOG = LoggerFactory.getLogger( FlowGraph.class );
044
045  public FlowGraph( IdentifierGraph identifierGraph )
046    {
047    super( Integer.class );
048
049    makeGraph( identifierGraph );
050
051    verifyNoCycles();
052    }
053
054  public TopologicalOrderIterator<Flow, Integer> getTopologicalIterator()
055    {
056    return new TopologicalOrderIterator<Flow, Integer>( this, new PriorityQueue<Flow>( 10, new Comparator<Flow>()
057    {
058    @Override
059    public int compare( Flow lhs, Flow rhs )
060      {
061      return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() );
062      }
063    } ) );
064    }
065
066  private void verifyNoCycles()
067    {
068    Set<Flow> flows = new HashSet<Flow>();
069
070    TopologicalOrderIterator<Flow, Integer> topoIterator = new TopologicalOrderIterator<Flow, Integer>( this );
071
072    while( topoIterator.hasNext() )
073      flows.add( topoIterator.next() );
074
075    if( flows.size() != vertexSet().size() )
076      throw new CascadeException( "there are likely cycles in the set of given flows, topological iterator cannot traverse flows with cycles" );
077    }
078
079  private void makeGraph( IdentifierGraph identifierGraph )
080    {
081    Set<String> identifiers = identifierGraph.vertexSet();
082
083    int count = 0;
084
085    for( String source : identifiers )
086      {
087      if( LOG.isDebugEnabled() )
088        LOG.debug( "handling flow source: {}", source );
089
090      List<String> sinks = Graphs.successorListOf( identifierGraph, source );
091
092      for( String sink : sinks )
093        {
094        if( LOG.isDebugEnabled() )
095          LOG.debug( "handling flow path: {} -> {}", source, sink );
096
097        Flow flow = identifierGraph.getEdge( source, sink ).flow;
098
099        addVertex( flow );
100
101        Set<BaseFlow.FlowHolder> previous = identifierGraph.incomingEdgesOf( source );
102
103        for( BaseFlow.FlowHolder previousFlow : previous )
104          {
105          addVertex( previousFlow.flow );
106
107          if( getEdge( previousFlow.flow, flow ) != null )
108            continue;
109
110          if( !addEdge( previousFlow.flow, flow, count++ ) )
111            throw new CascadeException( "unable to add path between: " + previousFlow.flow.getName() + " and: " + flow.getName() );
112          }
113        }
114      }
115    }
116  }