001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.iso.subgraph.iterator;
023
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.planner.Scope;
033import cascading.flow.planner.graph.ElementGraph;
034import cascading.flow.planner.graph.ElementGraphs;
035import cascading.flow.planner.graph.ElementMaskSubGraph;
036import cascading.flow.planner.graph.Extent;
037import cascading.flow.planner.iso.ElementAnnotation;
038import cascading.flow.planner.iso.subgraph.SubGraphIterator;
039import cascading.util.EnumMultiMap;
040import cascading.util.Pair;
041import org.jgrapht.GraphPath;
042
043import static cascading.flow.planner.graph.ElementGraphs.*;
044import static cascading.util.Util.createIdentitySet;
045
046/**
047 *
048 */
049public class IncludeRemainderSubGraphIterator implements SubGraphIterator
050  {
051  SubGraphIterator parentIterator;
052  boolean multiEdge;
053
054  Set<FlowElement> maskedElements = createIdentitySet();
055  Set<Scope> maskedScopes = new HashSet<>();
056
057  {
058  // creates consistent results across SubGraphIterators
059  maskedElements.add( Extent.head );
060  maskedElements.add( Extent.tail );
061  }
062
063  public IncludeRemainderSubGraphIterator( SubGraphIterator parentIterator, boolean multiEdge )
064    {
065    this.parentIterator = parentIterator;
066    this.multiEdge = multiEdge;
067    }
068
069  @Override
070  public ElementGraph getElementGraph()
071    {
072    return parentIterator.getElementGraph();
073    }
074
075  @Override
076  public EnumMultiMap getAnnotationMap( ElementAnnotation[] annotations )
077    {
078    return parentIterator.getAnnotationMap( annotations );
079    }
080
081  @Override
082  public boolean hasNext()
083    {
084    return parentIterator.hasNext();
085    }
086
087  @Override
088  public ElementGraph next()
089    {
090    ElementGraph next = parentIterator.next();
091
092    if( parentIterator.hasNext() )
093      {
094      // hide these elements from the remainder graph
095      maskedElements.addAll( next.vertexSet() );
096      maskedScopes.addAll( next.edgeSet() ); // catches case with no elements on path
097
098      return next;
099      }
100
101    ElementGraph elementGraph = parentIterator.getElementGraph();
102
103    if( !multiEdge ) // no effect on mergepipes/tez
104      {
105      maskedElements.removeAll( next.vertexSet() );
106      maskedScopes.removeAll( next.edgeSet() );
107      }
108    else
109      {
110      // this is experimental, but was intended to allow capture of multiple edges between two
111      // nodes
112      maskedElements.addAll( next.vertexSet() );
113      maskedScopes.addAll( next.edgeSet() );
114
115      // if there is branching in the root graph, common ancestors could be masked out
116      // here we iterate all paths for all remaining paths
117
118      for( FlowElement maskedElement : new ArrayList<>( maskedElements ) )
119        {
120        if( !maskedScopes.containsAll( elementGraph.edgesOf( maskedElement ) ) )
121          maskedElements.remove( maskedElement );
122        }
123      }
124
125    // previously source/sink pairs captured in prior partitions
126    Set<Pair<FlowElement, FlowElement>> pairs = getPairs();
127
128    ElementMaskSubGraph maskSubGraph = new ElementMaskSubGraph( elementGraph, maskedElements, maskedScopes );
129
130    // remaining source/sink pairs we need to traverse
131    Set<FlowElement> sources = findSources( maskSubGraph, FlowElement.class );
132    Set<FlowElement> sinks = findSinks( maskSubGraph, FlowElement.class );
133
134    for( FlowElement source : sources )
135      {
136      for( FlowElement sink : sinks )
137        {
138        if( pairs.contains( new Pair<>( source, sink ) ) )
139          continue;
140
141        List<GraphPath<FlowElement, Scope>> paths = getAllShortestPathsBetween( elementGraph, source, sink );
142
143        for( GraphPath<FlowElement, Scope> path : paths )
144          {
145          maskedElements.removeAll( path.getVertexList() );
146
147          Collection<Scope> edgeList = path.getEdgeList();
148
149          if( multiEdge )
150            edgeList = ElementGraphs.getAllMultiEdgesBetween( edgeList, elementGraph );
151
152          maskedScopes.removeAll( edgeList );
153          }
154        }
155      }
156
157    // new graph since the prior made a copy of the masked vertices/edges
158    return new ElementMaskSubGraph( elementGraph, maskedElements, maskedScopes );
159    }
160
161  protected Set<Pair<FlowElement, FlowElement>> getPairs()
162    {
163    Set<Pair<FlowElement, FlowElement>> pairs = Collections.emptySet();
164
165    if( parentIterator instanceof UniquePathSubGraphIterator )
166      pairs = ( (UniquePathSubGraphIterator) parentIterator ).getPairs();
167
168    return pairs;
169    }
170
171  @Override
172  public void remove()
173    {
174    parentIterator.remove();
175    }
176  }