001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.iso.subgraph.iterator; 023 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.List; 029import java.util.Set; 030 031import cascading.flow.FlowElement; 032import cascading.flow.planner.Scope; 033import cascading.flow.planner.graph.ElementGraph; 034import cascading.flow.planner.graph.ElementSubGraph; 035import cascading.flow.planner.iso.ElementAnnotation; 036import cascading.flow.planner.iso.subgraph.SubGraphIterator; 037import cascading.util.EnumMultiMap; 038import cascading.util.Pair; 039import org.jgrapht.GraphPath; 040 041import static cascading.flow.planner.graph.ElementGraphs.*; 042import static cascading.util.Util.getFirst; 043 044/** 045 * 046 */ 047public class UniquePathSubGraphIterator implements SubGraphIterator 048 { 049 SubGraphIterator subGraphIterator; 050 boolean longestFirst; 051 boolean multiEdge; 052 053 ElementGraph current = null; 054 Iterator<GraphPath<FlowElement, Scope>> pathsIterator; 055 Set<Pair<FlowElement, FlowElement>> pairs = new HashSet<>(); 056 057 public UniquePathSubGraphIterator( SubGraphIterator subGraphIterator, boolean longestFirst, boolean multiEdge ) 058 { 059 this.subGraphIterator = subGraphIterator; 060 this.longestFirst = longestFirst; 061 this.multiEdge = multiEdge; 062 } 063 064 public Set<Pair<FlowElement, FlowElement>> getPairs() 065 { 066 return pairs; 067 } 068 069 @Override 070 public ElementGraph getElementGraph() 071 { 072 return subGraphIterator.getElementGraph(); 073 } 074 075 @Override 076 public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations ) 077 { 078 return subGraphIterator.getAnnotationMap( annotations ); // unsure we need to narrow results 079 } 080 081 @Override 082 public boolean hasNext() 083 { 084 if( pathsIterator == null ) 085 advance(); 086 087 if( current == null || pathsIterator == null ) 088 return false; 089 090 boolean hasNextPath = pathsIterator.hasNext(); 091 092 if( hasNextPath ) 093 return true; 094 095 return subGraphIterator.hasNext(); 096 } 097 098 private void advance() 099 { 100 if( current == null ) 101 { 102 if( !subGraphIterator.hasNext() ) 103 return; 104 105 current = subGraphIterator.next(); 106 pathsIterator = null; 107 } 108 109 if( pathsIterator == null ) 110 { 111 Set<FlowElement> sources = findSources( current, FlowElement.class ); 112 Set<FlowElement> sinks = findSinks( current, FlowElement.class ); 113 114 if( sources.size() > 1 || sinks.size() > 1 ) 115 throw new IllegalArgumentException( "only supports single source and single sink graphs" ); 116 117 FlowElement source = getFirst( sources ); 118 FlowElement sink = getFirst( sinks ); 119 120 pairs.add( new Pair<>( source, sink ) ); 121 122 List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( current, source, sink ); 123 124 if( longestFirst ) 125 Collections.reverse( paths ); // break off longest paths into own partitions 126 127 pathsIterator = paths.iterator(); 128 } 129 } 130 131 @Override 132 public ElementGraph next() 133 { 134 if( !pathsIterator.hasNext() ) 135 { 136 current = null; 137 pathsIterator = null; 138 139 advance(); 140 141 return next(); 142 } 143 144 GraphPath<FlowElement, Scope> path = pathsIterator.next(); 145 List<FlowElement> vertexList = path.getVertexList(); 146 Collection<Scope> edgeList = path.getEdgeList(); 147 148 if( multiEdge ) 149 edgeList = getAllMultiEdgesBetween( edgeList, current ); 150 151 return new ElementSubGraph( current, vertexList, edgeList ); 152 } 153 154 @Override 155 public void remove() 156 { 157 subGraphIterator.remove(); 158 } 159 }