001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.process;
022
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.flow.FlowElement;
031import cascading.flow.FlowNode;
032import cascading.flow.planner.BaseFlowNode;
033import cascading.flow.planner.BaseFlowNodeFactory;
034import cascading.flow.planner.graph.ElementGraph;
035import cascading.flow.planner.graph.FlowElementGraph;
036
037import static cascading.util.Util.createIdentitySet;
038
039/**
040 *
041 */
042public class FlowNodeGraph extends BaseProcessGraph<FlowNode>
043  {
044  public static final FlowNodeComparator FLOW_NODE_COMPARATOR = new FlowNodeComparator();
045
046  /**
047   * Class FlowNodeComparator provides a consistent tie breaker when ordering nodes topologically.
048   * <p/>
049   * This should have no effect on submission and execution priority as all FlowNodes are submitted simultaneously.
050   */
051  public static class FlowNodeComparator implements Comparator<FlowNode>
052    {
053    @Override
054    public int compare( FlowNode lhs, FlowNode rhs )
055      {
056      // larger graph first
057      int lhsSize = lhs.getElementGraph().vertexSet().size();
058      int rhsSize = rhs.getElementGraph().vertexSet().size();
059      int result = ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 );
060
061      if( result != 0 )
062        return result;
063
064      // more inputs second
065      lhsSize = lhs.getSourceElements().size();
066      rhsSize = rhs.getSourceElements().size();
067
068      return ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 );
069      }
070    }
071
072  public FlowNodeGraph()
073    {
074    }
075
076  public FlowNodeGraph( FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs )
077    {
078    this( new BaseFlowNodeFactory(), flowElementGraph, nodeSubGraphs );
079    }
080
081  public FlowNodeGraph( FlowNodeFactory flowNodeFactory, List<? extends ElementGraph> nodeSubGraphs )
082    {
083    this( flowNodeFactory, null, nodeSubGraphs );
084    }
085
086  public FlowNodeGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs )
087    {
088    this( flowNodeFactory, flowElementGraph, nodeSubGraphs, Collections.<ElementGraph, List<? extends ElementGraph>>emptyMap() );
089    }
090
091  public FlowNodeGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap )
092    {
093    buildGraph( flowNodeFactory, flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap );
094
095    // consistently sets ordinal of node based on topological dependencies and tie breaking by the given Comparator
096    Iterator<FlowNode> iterator = getOrderedTopologicalIterator();
097
098    int ordinal = 0;
099    int size = vertexSet().size();
100
101    while( iterator.hasNext() )
102      {
103      BaseFlowNode next = (BaseFlowNode) iterator.next();
104
105      next.setOrdinal( ordinal );
106      next.setName( flowNodeFactory.makeFlowNodeName( next, size, ordinal ) );
107
108      ordinal++;
109      }
110    }
111
112  protected void buildGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap )
113    {
114    if( pipelineSubGraphsMap == null )
115      pipelineSubGraphsMap = Collections.emptyMap();
116
117    for( ElementGraph nodeSubGraph : nodeSubGraphs )
118      {
119      List<? extends ElementGraph> pipelineGraphs = pipelineSubGraphsMap.get( nodeSubGraph );
120
121      FlowNode flowNode = flowNodeFactory.createFlowNode( flowElementGraph, nodeSubGraph, pipelineGraphs );
122
123      addVertex( flowNode );
124      }
125
126    bindEdges();
127    }
128
129  public Set<FlowElement> getFlowElementsFor( Enum annotation )
130    {
131    Set<FlowElement> results = createIdentitySet();
132
133    for( FlowNode flowNode : vertexSet() )
134      results.addAll( flowNode.getFlowElementsFor( annotation ) );
135
136    return results;
137    }
138
139  public Iterator<FlowNode> getOrderedTopologicalIterator()
140    {
141    return super.getOrderedTopologicalIterator( FLOW_NODE_COMPARATOR );
142    }
143  }