001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.process;
023
024import java.util.Collections;
025import java.util.Comparator;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.FlowNode;
033import cascading.flow.planner.BaseFlowNode;
034import cascading.flow.planner.BaseFlowNodeFactory;
035import cascading.flow.planner.graph.ElementGraph;
036import cascading.flow.planner.graph.FlowElementGraph;
037
038import static cascading.util.Util.createIdentitySet;
039
040/**
041 *
042 */
043public class FlowNodeGraph extends BaseProcessGraph<FlowNode>
044  {
045  public static final FlowNodeComparator FLOW_NODE_COMPARATOR = new FlowNodeComparator();
046
047  /**
048   * Class FlowNodeComparator provides a consistent tie breaker when ordering nodes topologically.
049   * <p>
050   * This should have no effect on submission and execution priority as all FlowNodes are submitted simultaneously.
051   */
052  public static class FlowNodeComparator implements Comparator<FlowNode>
053    {
054    @Override
055    public int compare( FlowNode lhs, FlowNode rhs )
056      {
057      // larger graph first
058      int lhsSize = lhs.getElementGraph().vertexSet().size();
059      int rhsSize = rhs.getElementGraph().vertexSet().size();
060      int result = ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 );
061
062      if( result != 0 )
063        return result;
064
065      // more inputs second
066      lhsSize = lhs.getSourceElements().size();
067      rhsSize = rhs.getSourceElements().size();
068
069      return ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 );
070      }
071    }
072
073  public FlowNodeGraph()
074    {
075    }
076
077  public FlowNodeGraph( FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs )
078    {
079    this( new BaseFlowNodeFactory(), flowElementGraph, nodeSubGraphs );
080    }
081
082  public FlowNodeGraph( FlowNodeFactory flowNodeFactory, List<? extends ElementGraph> nodeSubGraphs )
083    {
084    this( flowNodeFactory, null, nodeSubGraphs );
085    }
086
087  public FlowNodeGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs )
088    {
089    this( flowNodeFactory, flowElementGraph, nodeSubGraphs, Collections.<ElementGraph, List<? extends ElementGraph>>emptyMap() );
090    }
091
092  public FlowNodeGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap )
093    {
094    buildGraph( flowNodeFactory, flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap );
095
096    // consistently sets ordinal of node based on topological dependencies and tie breaking by the given Comparator
097    Iterator<FlowNode> iterator = getOrderedTopologicalIterator();
098
099    int ordinal = 0;
100    int size = vertexSet().size();
101
102    while( iterator.hasNext() )
103      {
104      BaseFlowNode next = (BaseFlowNode) iterator.next();
105
106      next.setOrdinal( ordinal );
107      next.setName( flowNodeFactory.makeFlowNodeName( next, size, ordinal ) );
108
109      ordinal++;
110      }
111    }
112
113  protected void buildGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap )
114    {
115    if( pipelineSubGraphsMap == null )
116      pipelineSubGraphsMap = Collections.emptyMap();
117
118    for( ElementGraph nodeSubGraph : nodeSubGraphs )
119      {
120      List<? extends ElementGraph> pipelineGraphs = pipelineSubGraphsMap.get( nodeSubGraph );
121
122      FlowNode flowNode = flowNodeFactory.createFlowNode( flowElementGraph, nodeSubGraph, pipelineGraphs );
123
124      addVertex( flowNode );
125      }
126
127    bindEdges();
128    }
129
130  public Set<FlowElement> getFlowElementsFor( Enum annotation )
131    {
132    Set<FlowElement> results = createIdentitySet();
133
134    for( FlowNode flowNode : vertexSet() )
135      results.addAll( flowNode.getFlowElementsFor( annotation ) );
136
137    return results;
138    }
139
140  public Iterator<FlowNode> getOrderedTopologicalIterator()
141    {
142    return super.getOrderedTopologicalIterator( FLOW_NODE_COMPARATOR );
143    }
144  }