001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026 027import cascading.flow.FlowElement; 028import cascading.flow.FlowStep; 029import cascading.flow.planner.BaseFlowStep; 030import cascading.flow.planner.graph.AnnotatedDecoratedElementGraph; 031import cascading.flow.planner.graph.ElementGraph; 032import cascading.flow.planner.graph.FlowElementGraph; 033import cascading.util.EnumMultiMap; 034 035public class FlowStepGraph extends BaseProcessGraph<FlowStep> 036 { 037 public FlowStepGraph() 038 { 039 } 040 041 public FlowStepGraph( FlowStepFactory flowStepFactory, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap ) 042 { 043 this( flowStepFactory, flowElementGraph, nodeSubGraphsMap, null ); 044 } 045 046 public FlowStepGraph( FlowStepFactory flowStepFactory, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 047 { 048 buildGraph( flowStepFactory, flowElementGraph, nodeSubGraphsMap, pipelineSubGraphsMap ); 049 050 Iterator<FlowStep> iterator = getTopologicalIterator(); 051 052 int ordinal = 0; 053 int size = vertexSet().size(); 054 055 while( iterator.hasNext() ) 056 { 057 BaseFlowStep flowStep = (BaseFlowStep) iterator.next(); 058 059 flowStep.setOrdinal( ordinal++ ); 060 flowStep.setName( flowStepFactory.makeFlowStepName( flowStep, size, flowStep.getOrdinal() ) ); 061 } 062 } 063 064 protected void buildGraph( FlowStepFactory flowStepFactory, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> nodeSubGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap ) 065 { 066 for( ElementGraph stepSubGraph : nodeSubGraphsMap.keySet() ) 067 { 068 List<? extends ElementGraph> nodeSubGraphs = nodeSubGraphsMap.get( stepSubGraph ); 069 FlowNodeGraph flowNodeGraph = createFlowNodeGraph( flowStepFactory, flowElementGraph, pipelineSubGraphsMap, nodeSubGraphs ); 070 071 EnumMultiMap<FlowElement> annotations = flowNodeGraph.getAnnotations(); 072 073 // pull up annotations 074 if( !annotations.isEmpty() ) 075 stepSubGraph = new AnnotatedDecoratedElementGraph( stepSubGraph, annotations ); 076 077 FlowStep flowStep = flowStepFactory.createFlowStep( stepSubGraph, flowNodeGraph ); 078 079 addVertex( flowStep ); 080 } 081 082 bindEdges(); 083 } 084 085 protected FlowNodeGraph createFlowNodeGraph( FlowStepFactory flowStepFactory, FlowElementGraph flowElementGraph, Map<ElementGraph, List<? extends ElementGraph>> pipelineSubGraphsMap, List<? extends ElementGraph> nodeSubGraphs ) 086 { 087 return new FlowNodeGraph( flowStepFactory.getFlowNodeFactory(), flowElementGraph, nodeSubGraphs, pipelineSubGraphsMap ); 088 } 089 }