001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.io.FileWriter; 024 import java.io.IOException; 025 import java.io.Writer; 026 import java.util.Comparator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.PriorityQueue; 030 import java.util.Set; 031 032 import cascading.flow.FlowElement; 033 import cascading.flow.FlowStep; 034 import cascading.pipe.Group; 035 import cascading.tap.Tap; 036 import cascading.util.Util; 037 import org.jgrapht.GraphPath; 038 import org.jgrapht.Graphs; 039 import org.jgrapht.ext.IntegerNameProvider; 040 import org.jgrapht.ext.VertexNameProvider; 041 import org.jgrapht.graph.SimpleDirectedGraph; 042 import org.jgrapht.traverse.TopologicalOrderIterator; 043 import org.slf4j.Logger; 044 import org.slf4j.LoggerFactory; 045 046 /** Class StepGraph is an internal representation of {@link FlowStep} instances. */ 047 public abstract class FlowStepGraph<Config> extends SimpleDirectedGraph<FlowStep<Config>, Integer> 048 { 049 /** Field LOG */ 050 private static final Logger LOG = LoggerFactory.getLogger( FlowStepGraph.class ); 051 052 /** Constructor StepGraph creates a new StepGraph instance. */ 053 public FlowStepGraph() 054 { 055 super( Integer.class ); 056 } 057 058 /** 059 * Constructor StepGraph creates a new StepGraph instance. 060 * 061 * @param elementGraph of type ElementGraph 062 */ 063 public FlowStepGraph( String flowName, ElementGraph elementGraph ) 064 { 065 this(); 066 067 makeStepGraph( flowName, elementGraph ); 068 } 069 070 /** 071 * Method getCreateFlowStep ... 072 * 073 * @param steps of type Map<String, FlowStep> 074 * @param sink of type String 075 * @param numJobs of type int 076 * @return FlowStep 077 */ 078 protected FlowStep<Config> getCreateFlowStep( Map<Tap, FlowStep<Config>> steps, Tap sink, int numJobs ) 079 { 080 if( steps.containsKey( sink ) ) 081 return steps.get( sink ); 082 083 LOG.debug( "creating step: {}", sink ); 084 085 int stepNum = steps.size() + 1; 086 String stepName = makeStepName( sink, numJobs, stepNum ); 087 FlowStep<Config> step = createFlowStep( stepName, stepNum ); 088 089 steps.put( sink, step ); 090 091 return step; 092 } 093 094 protected abstract FlowStep<Config> createFlowStep( String stepName, int stepNum ); 095 096 private String makeStepName( Tap sink, int numJobs, int stepNum ) 097 { 098 if( sink.isTemporary() ) 099 return String.format( "(%d/%d)", stepNum, numJobs ); 100 101 String identifier = sink.getIdentifier(); 102 103 if( identifier.length() > 25 ) 104 identifier = String.format( "...%25s", identifier.substring( identifier.length() - 25 ) ); 105 106 return String.format( "(%d/%d) %s", stepNum, numJobs, identifier ); 107 } 108 109 protected abstract void makeStepGraph( String flowName, ElementGraph elementGraph ); 110 111 protected boolean pathContainsTap( GraphPath<FlowElement, Scope> path ) 112 { 113 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 114 115 // first and last are taps, if we find more than 2, return false 116 int count = 0; 117 118 for( FlowElement flowElement : flowElements ) 119 { 120 if( flowElement instanceof Tap ) 121 count++; 122 } 123 124 return count > 2; 125 } 126 127 public TopologicalOrderIterator<FlowStep<Config>, Integer> getTopologicalIterator() 128 { 129 return new TopologicalOrderIterator<FlowStep<Config>, Integer>( this, new PriorityQueue<FlowStep<Config>>( 10, new Comparator<FlowStep<Config>>() 130 { 131 @Override 132 public int compare( FlowStep<Config> lhs, FlowStep<Config> rhs ) 133 { 134 return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() ); 135 } 136 } ) ); 137 } 138 139 /** 140 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 141 * 142 * @param filename of type String 143 */ 144 public void writeDOT( String filename ) 145 { 146 printElementGraph( filename ); 147 } 148 149 protected void printElementGraph( String filename ) 150 { 151 try 152 { 153 Writer writer = new FileWriter( filename ); 154 155 Util.writeDOT( writer, this, new IntegerNameProvider<BaseFlowStep>(), new VertexNameProvider<FlowStep>() 156 { 157 public String getVertexName( FlowStep flowStep ) 158 { 159 String sourceName = ""; 160 161 for( Object object : flowStep.getSources() ) 162 { 163 Tap source = (Tap) object; 164 165 if( source.isTemporary() ) 166 continue; 167 168 sourceName += "[" + source.getIdentifier() + "]"; 169 } 170 171 String name = "[" + flowStep.getName() + "]"; 172 173 if( sourceName.length() != 0 ) 174 name += "\\nsrc:" + sourceName; 175 176 177 List<Group> groups = flowStep.getGroups(); 178 179 for( Group group : groups ) 180 { 181 String groupName = group.getName(); 182 183 if( groupName.length() != 0 ) 184 name += "\\ngrp:" + groupName; 185 } 186 187 Set<Tap> sinks = flowStep.getSinks(); 188 189 for( Tap sink : sinks ) 190 { 191 String sinkName = sink.isTemporary() ? "" : "[" + sink.getIdentifier() + "]"; 192 if( sinkName.length() != 0 ) 193 name += "\\nsnk:" + sinkName; 194 } 195 196 return name.replaceAll( "\"", "\'" ); 197 } 198 }, null ); 199 200 writer.close(); 201 } 202 catch( IOException exception ) 203 { 204 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 205 } 206 } 207 208 }