001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.iso.subgraph.iterator;
023
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.planner.Scope;
033import cascading.flow.planner.graph.ElementGraph;
034import cascading.flow.planner.graph.ElementSubGraph;
035import cascading.flow.planner.iso.ElementAnnotation;
036import cascading.flow.planner.iso.subgraph.SubGraphIterator;
037import cascading.util.EnumMultiMap;
038import cascading.util.Pair;
039import org.jgrapht.GraphPath;
040
041import static cascading.flow.planner.graph.ElementGraphs.*;
042import static cascading.util.Util.getFirst;
043
044/**
045 *
046 */
047public class UniquePathSubGraphIterator implements SubGraphIterator
048  {
049  SubGraphIterator subGraphIterator;
050  boolean longestFirst;
051  boolean multiEdge;
052
053  ElementGraph current = null;
054  Iterator<GraphPath<FlowElement, Scope>> pathsIterator;
055  Set<Pair<FlowElement, FlowElement>> pairs = new HashSet<>();
056
057  public UniquePathSubGraphIterator( SubGraphIterator subGraphIterator, boolean longestFirst, boolean multiEdge )
058    {
059    this.subGraphIterator = subGraphIterator;
060    this.longestFirst = longestFirst;
061    this.multiEdge = multiEdge;
062    }
063
064  public Set<Pair<FlowElement, FlowElement>> getPairs()
065    {
066    return pairs;
067    }
068
069  @Override
070  public ElementGraph getElementGraph()
071    {
072    return subGraphIterator.getElementGraph();
073    }
074
075  @Override
076  public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations )
077    {
078    return subGraphIterator.getAnnotationMap( annotations ); // unsure we need to narrow results
079    }
080
081  @Override
082  public boolean hasNext()
083    {
084    if( pathsIterator == null )
085      advance();
086
087    if( current == null || pathsIterator == null )
088      return false;
089
090    boolean hasNextPath = pathsIterator.hasNext();
091
092    if( hasNextPath )
093      return true;
094
095    return subGraphIterator.hasNext();
096    }
097
098  private void advance()
099    {
100    if( current == null )
101      {
102      if( !subGraphIterator.hasNext() )
103        return;
104
105      current = subGraphIterator.next();
106      pathsIterator = null;
107      }
108
109    if( pathsIterator == null )
110      {
111      Set<FlowElement> sources = findSources( current, FlowElement.class );
112      Set<FlowElement> sinks = findSinks( current, FlowElement.class );
113
114      if( sources.size() > 1 || sinks.size() > 1 )
115        throw new IllegalArgumentException( "only supports single source and single sink graphs" );
116
117      FlowElement source = getFirst( sources );
118      FlowElement sink = getFirst( sinks );
119
120      pairs.add( new Pair<>( source, sink ) );
121
122      List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( current, source, sink );
123
124      if( longestFirst )
125        Collections.reverse( paths ); // break off longest paths into own partitions
126
127      pathsIterator = paths.iterator();
128      }
129    }
130
131  @Override
132  public ElementGraph next()
133    {
134    if( !pathsIterator.hasNext() )
135      {
136      current = null;
137      pathsIterator = null;
138
139      advance();
140
141      return next();
142      }
143
144    GraphPath<FlowElement, Scope> path = pathsIterator.next();
145    List<FlowElement> vertexList = path.getVertexList();
146    Collection<Scope> edgeList = path.getEdgeList();
147
148    if( multiEdge )
149      edgeList = getAllMultiEdgesBetween( edgeList, current );
150
151    return new ElementSubGraph( current, vertexList, edgeList );
152    }
153
154  @Override
155  public void remove()
156    {
157    subGraphIterator.remove();
158    }
159  }