001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.graph; 022 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.flow.FlowElement; 031import cascading.flow.FlowElements; 032import cascading.flow.planner.ElementGraphException; 033import cascading.flow.planner.PlatformInfo; 034import cascading.flow.planner.Scope; 035import cascading.pipe.Checkpoint; 036import cascading.pipe.Pipe; 037import cascading.pipe.Splice; 038import cascading.pipe.SubAssembly; 039import cascading.tap.Tap; 040import cascading.util.EnumMultiMap; 041import cascading.util.Util; 042import org.jgrapht.Graphs; 043import org.jgrapht.traverse.DepthFirstIterator; 044import org.jgrapht.traverse.TopologicalOrderIterator; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** Class ElementGraph represents the executable FlowElement graph. */ 049public class FlowElementGraph extends ElementDirectedGraph implements AnnotatedGraph 050 { 051 /** Field LOG */ 052 private static final Logger LOG = LoggerFactory.getLogger( FlowElementGraph.class ); 053 054 /** Field resolved */ 055 private boolean resolved; 056 /** Field platformInfo */ 057 protected PlatformInfo platformInfo; 058 /** Field sources */ 059 protected Map<String, Tap> sources; 060 /** Field sinks */ 061 protected Map<String, Tap> sinks; 062 /** Field traps */ 063 protected Map<String, Tap> traps; 064 /** Field checkpoints */ 065 protected Map<String, Tap> checkpoints; 066 /** Field requireUniqueCheckpoints */ 067 private boolean requireUniqueCheckpoints; 068 069 // used for creating isolated test graphs 070 protected FlowElementGraph() 071 { 072 } 073 074 public FlowElementGraph( FlowElementGraph flowElementGraph ) 075 { 076 this(); 077 this.platformInfo = flowElementGraph.platformInfo; 078 this.sources = flowElementGraph.sources; 079 this.sinks = flowElementGraph.sinks; 080 this.traps = flowElementGraph.traps; 081 this.checkpoints = flowElementGraph.checkpoints; 082 this.requireUniqueCheckpoints = flowElementGraph.requireUniqueCheckpoints; 083 084 if( flowElementGraph.annotations != null ) 085 this.annotations = new EnumMultiMap<>( flowElementGraph.annotations ); 086 087 copyFrom( flowElementGraph ); 088 } 089 090 public FlowElementGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks ) 091 { 092 this( null, pipes, sources, sinks, Collections.<String, Tap>emptyMap(), Collections.<String, Tap>emptyMap(), false ); 093 } 094 095 /** 096 * Constructor ElementGraph creates a new ElementGraph instance. 097 * 098 * @param pipes of type Pipe[] 099 * @param sources of type Map<String, Tap> 100 * @param sinks of type Map<String, Tap> 101 */ 102 public FlowElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints ) 103 { 104 this(); 105 this.platformInfo = platformInfo; 106 this.sources = sources; 107 this.sinks = sinks; 108 this.traps = traps; 109 this.checkpoints = checkpoints; 110 this.requireUniqueCheckpoints = requireUniqueCheckpoints; 111 112 assembleGraph( pipes, sources, sinks ); 113 114 verifyGraph(); 115 } 116 117 public Map<String, Tap> getSourceMap() 118 { 119 return sources; 120 } 121 122 public Map<String, Tap> getSinkMap() 123 { 124 return sinks; 125 } 126 127 public Map<String, Tap> getTrapMap() 128 { 129 return traps; 130 } 131 132 public Map<String, Tap> getCheckpointsMap() 133 { 134 return checkpoints; 135 } 136 137 public Collection<Tap> getSources() 138 { 139 return sources.values(); 140 } 141 142 public Collection<Tap> getSinks() 143 { 144 return sinks.values(); 145 } 146 147 public Collection<Tap> getTraps() 148 { 149 return traps.values(); 150 } 151 152 protected void initialize( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails ) 153 { 154 this.sources = sources; 155 this.sinks = sinks; 156 this.traps = Util.createHashMap(); 157 158 assembleGraph( tails, sources, sinks ); 159 160 verifyGraph(); 161 } 162 163 private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks ) 164 { 165 HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources ); 166 HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks ); 167 168 for( Pipe pipe : pipes ) 169 makeGraph( pipe, sourcesCopy, sinksCopy ); 170 171 addExtents( sources, sinks ); 172 } 173 174 private void verifyGraph() 175 { 176 if( vertexSet().isEmpty() ) 177 return; 178 179 Set<String> checkpointNames = new HashSet<String>(); 180 181 // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected 182 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator(); 183 184 FlowElement flowElement = null; 185 186 while( iterator.hasNext() ) 187 { 188 try 189 { 190 flowElement = iterator.next(); 191 } 192 catch( IllegalArgumentException exception ) 193 { 194 if( flowElement == null ) 195 throw new ElementGraphException( "unable to traverse to the first element" ); 196 197 throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement ); 198 } 199 200 if( requireUniqueCheckpoints && flowElement instanceof Checkpoint ) 201 { 202 String name = ( (Checkpoint) flowElement ).getName(); 203 204 if( checkpointNames.contains( name ) ) 205 throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name ); 206 207 checkpointNames.add( name ); 208 } 209 210 if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 ) 211 continue; 212 213 if( flowElement instanceof Extent ) 214 continue; 215 216 if( flowElement instanceof Pipe ) 217 { 218 if( incomingEdgesOf( flowElement ).size() == 0 ) 219 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" ); 220 else 221 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" ); 222 } 223 224 if( flowElement instanceof Tap ) 225 throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement ); 226 else 227 throw new ElementGraphException( flowElement, "unknown element type: " + flowElement ); 228 } 229 } 230 231 protected FlowElementGraph shallowCopyElementGraph() 232 { 233 FlowElementGraph copy = new FlowElementGraph(); 234 Graphs.addGraph( copy.graph, this.graph ); 235 236 copy.traps = new HashMap<String, Tap>( this.traps ); 237 238 return copy; 239 } 240 241 public boolean isResolved() 242 { 243 return resolved; 244 } 245 246 public void setResolved( boolean resolved ) 247 { 248 this.resolved = resolved; 249 } 250 251 /** 252 * created to support the ability to generate all paths between the head and tail of the process. 253 * 254 * @param sources 255 * @param sinks 256 */ 257 private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks ) 258 { 259 addVertex( Extent.head ); 260 261 for( String source : sources.keySet() ) 262 { 263 Scope scope = addEdge( Extent.head, sources.get( source ) ); 264 265 // edge may already exist, if so, above returns null 266 if( scope != null ) 267 scope.setName( source ); 268 } 269 270 addVertex( Extent.tail ); 271 272 for( String sink : sinks.keySet() ) 273 { 274 Scope scope; 275 276 try 277 { 278 scope = addEdge( sinks.get( sink ), Extent.tail ); 279 } 280 catch( IllegalArgumentException exception ) 281 { 282 throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" ); 283 } 284 285 if( scope == null ) 286 throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" ); 287 288 scope.setName( sink ); 289 } 290 } 291 292 /** 293 * Performs one rule check, verifies group does not join duplicate tap resources. 294 * <p/> 295 * Scopes are always named after the source side of the source -> target relationship 296 */ 297 private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks ) 298 { 299 LOG.debug( "adding pipe: {}", current ); 300 301 if( current instanceof SubAssembly ) 302 { 303 for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) ) 304 makeGraph( pipe, sources, sinks ); 305 306 return; 307 } 308 309 if( containsVertex( current ) ) 310 return; 311 312 addVertex( current ); 313 314 Tap sink = sinks.remove( current.getName() ); 315 316 if( sink != null ) 317 { 318 LOG.debug( "adding sink: {}", sink ); 319 320 addVertex( sink ); 321 322 LOG.debug( "adding edge: {} -> {}", current, sink ); 323 324 addEdge( current, sink ).setName( current.getName() ); 325 } 326 327 // PipeAssemblies should always have a previous 328 if( SubAssembly.unwind( current.getPrevious() ).length == 0 ) 329 { 330 Tap source = sources.remove( current.getName() ); 331 332 if( source != null ) 333 { 334 LOG.debug( "adding source: {}", source ); 335 336 addVertex( source ); 337 338 LOG.debug( "adding edge: {} -> {}", source, current ); 339 340 Scope scope = addEdge( source, current ); 341 342 scope.setName( current.getName() ); 343 344 setOrdinal( source, current, scope ); 345 } 346 } 347 348 for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) ) 349 { 350 makeGraph( previous, sources, sinks ); 351 352 LOG.debug( "adding edge: {} -> ", previous, current ); 353 354 if( getEdge( previous, current ) != null ) 355 throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous ); 356 357 Scope scope = addEdge( previous, current ); 358 359 scope.setName( previous.getName() ); // name scope after previous pipe 360 361 setOrdinal( previous, current, scope ); 362 } 363 } 364 365 private void setOrdinal( FlowElement previous, Pipe current, Scope scope ) 366 { 367 if( current instanceof Splice ) 368 { 369 Splice splice = (Splice) current; 370 371 Integer ordinal; 372 373 if( previous instanceof Tap ) // revert to pipe name 374 ordinal = splice.getPipePos().get( scope.getName() ); 375 else // GroupBy allows for duplicate pipe names, this guarantees correct ordinality 376 ordinal = FlowElements.findOrdinal( splice, (Pipe) previous ); 377 378 scope.setOrdinal( ordinal ); 379 380 Set<Scope> scopes = new HashSet<>( incomingEdgesOf( current ) ); 381 382 scopes.remove( scope ); 383 384 for( Scope other : scopes ) 385 { 386 if( other.getOrdinal() == scope.getOrdinal() ) 387 throw new IllegalStateException( "duplicate ordinals" ); 388 } 389 390 if( splice.isJoin() && ordinal != 0 ) 391 scope.setNonBlocking( false ); 392 } 393 } 394 395 /** 396 * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object. 397 * 398 * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object. 399 */ 400 public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator() 401 { 402 return new TopologicalOrderIterator<>( this.graph ); 403 } 404 405 /** 406 * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object. 407 * 408 * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object. 409 */ 410 public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator() 411 { 412 return new DepthFirstIterator<>( this.graph, Extent.head ); 413 } 414 415 private BaseElementGraph copyWithTraps() 416 { 417 FlowElementGraph copy = shallowCopyElementGraph(); 418 419 copy.addTrapsToGraph(); 420 421 return copy; 422 } 423 424 private void addTrapsToGraph() 425 { 426 DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator(); 427 428 while( iterator.hasNext() ) 429 { 430 FlowElement element = iterator.next(); 431 432 if( !( element instanceof Pipe ) ) 433 continue; 434 435 Pipe pipe = (Pipe) element; 436 Tap trap = traps.get( pipe.getName() ); 437 438 if( trap == null ) 439 continue; 440 441 addVertex( trap ); 442 443 if( LOG.isDebugEnabled() ) 444 LOG.debug( "adding trap edge: " + pipe + " -> " + trap ); 445 446 if( getEdge( pipe, trap ) != null ) 447 continue; 448 449 addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe 450 } 451 } 452 453 /** 454 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 455 * 456 * @param filename of type String 457 */ 458 @Override 459 public void writeDOT( String filename ) 460 { 461 boolean success = ElementGraphs.printElementGraph( filename, this.copyWithTraps(), platformInfo ); 462 463 if( success ) 464 Util.writePDF( filename ); 465 } 466 467 @Override 468 public ElementGraph copyElementGraph() 469 { 470 return new FlowElementGraph( this ); 471 } 472 }