001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.process; 023 024import java.util.Collections; 025import java.util.Comparator; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030 031import cascading.flow.FlowElement; 032import cascading.flow.FlowNode; 033import cascading.flow.planner.BaseFlowNode; 034import cascading.flow.planner.BaseFlowNodeFactory; 035import cascading.flow.planner.graph.ElementGraph; 036import cascading.flow.planner.graph.FlowElementGraph; 037 038import static cascading.util.Util.createIdentitySet; 039 040/** 041 * 042 */ 043public class FlowNodeGraph extends BaseProcessGraph<FlowNode> 044 { 045 public static final FlowNodeComparator FLOW_NODE_COMPARATOR = new FlowNodeComparator(); 046 047 /** 048 * Class FlowNodeComparator provides a consistent tie breaker when ordering nodes topologically. 049 * <p> 050 * This should have no effect on submission and execution priority as all FlowNodes are submitted simultaneously. 051 */ 052 public static class FlowNodeComparator implements Comparator<FlowNode> 053 { 054 @Override 055 public int compare( FlowNode lhs, FlowNode rhs ) 056 { 057 // larger graph first 058 int lhsSize = lhs.getElementGraph().vertexSet().size(); 059 int rhsSize = rhs.getElementGraph().vertexSet().size(); 060 int result = ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 ); 061 062 if( result != 0 ) 063 return result; 064 065 // more inputs second 066 lhsSize = lhs.getSourceElements().size(); 067 rhsSize = rhs.getSourceElements().size(); 068 069 return ( lhsSize < rhsSize ) ? -1 : ( ( lhsSize == rhsSize ) ? 0 : 1 ); 070 } 071 } 072 073 public FlowNodeGraph() 074 { 075 } 076 077 public FlowNodeGraph( FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs ) 078 { 079 this( new BaseFlowNodeFactory(), flowElementGraph, nodeSubGraphs ); 080 } 081 082 public FlowNodeGraph( FlowNodeFactory flowNodeFactory, List<? extends ElementGraph> nodeSubGraphs ) 083 { 084 this( flowNodeFactory, null, nodeSubGraphs ); 085 } 086 087 public FlowNodeGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs ) 088 { 089 this( flowNodeFactory, flowElementGraph, nodeSubGraphs, Collections.<ElementGraph, List<? extends ElementGraph>>emptyMap() ); 090 } 091 092 public FlowNodeGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 093 { 094 buildGraph( flowNodeFactory, flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap ); 095 096 // consistently sets ordinal of node based on topological dependencies and tie breaking by the given Comparator 097 Iterator<FlowNode> iterator = getOrderedTopologicalIterator(); 098 099 int ordinal = 0; 100 int size = vertexSet().size(); 101 102 while( iterator.hasNext() ) 103 { 104 BaseFlowNode next = (BaseFlowNode) iterator.next(); 105 106 next.setOrdinal( ordinal ); 107 next.setName( flowNodeFactory.makeFlowNodeName( next, size, ordinal ) ); 108 109 ordinal++; 110 } 111 } 112 113 protected void buildGraph( FlowNodeFactory flowNodeFactory, FlowElementGraph flowElementGraph, List<? extends ElementGraph> nodeSubGraphs, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 114 { 115 if( pipelineSubGraphsMap == null ) 116 pipelineSubGraphsMap = Collections.emptyMap(); 117 118 for( ElementGraph nodeSubGraph : nodeSubGraphs ) 119 { 120 List<? extends ElementGraph> pipelineGraphs = pipelineSubGraphsMap.get( nodeSubGraph ); 121 122 FlowNode flowNode = flowNodeFactory.createFlowNode( flowElementGraph, nodeSubGraph, pipelineGraphs ); 123 124 addVertex( flowNode ); 125 } 126 127 bindEdges(); 128 } 129 130 public Set<FlowElement> getFlowElementsFor( Enum annotation ) 131 { 132 Set<FlowElement> results = createIdentitySet(); 133 134 for( FlowNode flowNode : vertexSet() ) 135 results.addAll( flowNode.getFlowElementsFor( annotation ) ); 136 137 return results; 138 } 139 140 public Iterator<FlowNode> getOrderedTopologicalIterator() 141 { 142 return super.getOrderedTopologicalIterator( FLOW_NODE_COMPARATOR ); 143 } 144 }