001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.graph; 023 024import java.util.Collection; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.ListIterator; 028import java.util.Set; 029 030import cascading.flow.stream.duct.CloseReducingDuct; 031import cascading.flow.stream.duct.Collapsing; 032import cascading.flow.stream.duct.Duct; 033import cascading.flow.stream.duct.DuctGraph; 034import cascading.flow.stream.duct.Fork; 035import cascading.flow.stream.duct.Gate; 036import cascading.flow.stream.duct.OpenDuct; 037import cascading.flow.stream.duct.OpenReducingDuct; 038import cascading.flow.stream.duct.OpenWindow; 039import cascading.flow.stream.duct.OrdinalDuct; 040import cascading.flow.stream.duct.Reducing; 041import cascading.flow.stream.duct.Stage; 042import cascading.flow.stream.duct.Window; 043import cascading.util.Util; 044import org.jgrapht.DirectedGraph; 045import org.jgrapht.EdgeFactory; 046import org.jgrapht.Graphs; 047import org.jgrapht.graph.DirectedMultigraph; 048import org.jgrapht.traverse.TopologicalOrderIterator; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Class StreamGraph is the operation pipeline used during processing. This an internal use only class. 054 * <p/> 055 * Under some circumstances it may make sense to see the actual graph plan. To do so, enable one or both dot file 056 * properties, {@link #ERROR_DOT_FILE_NAME} and {@link #DOT_FILE_PATH}. 057 */ 058public class StreamGraph 059 { 060 /** Property denoting the path and filename to write the failed stream graph dot file. */ 061 public final static String ERROR_DOT_FILE_NAME = "cascading.stream.error.dotfile"; 062 063 /** 064 * Property denoting the path to write all stream graph dot files. The filename will be generated 065 * based on platform properties. 066 */ 067 public final static String DOT_FILE_PATH = "cascading.stream.dotfile.path"; 068 069 private static final Logger LOG = LoggerFactory.getLogger( StreamGraph.class ); 070 071 private final Duct HEAD = new Extent( "head" ); 072 private final Duct TAIL = new Extent( "tail" ); 073 074 private final DuctGraph ductGraph = new DuctGraph(); 075 076 private class Extent extends Stage 077 { 078 final String name; 079 080 private Extent( String name ) 081 { 082 this.name = name; 083 } 084 085 @Override 086 public String toString() 087 { 088 return name; 089 } 090 } 091 092 public StreamGraph() 093 { 094 } 095 096 protected Object getProperty( String name ) 097 { 098 return null; 099 } 100 101 Duct getHEAD() 102 { 103 return HEAD; 104 } 105 106 Duct getTAIL() 107 { 108 return TAIL; 109 } 110 111 public void addHead( Duct head ) 112 { 113 addPath( getHEAD(), head ); 114 } 115 116 public void addTail( Duct tail ) 117 { 118 addPath( tail, getTAIL() ); 119 } 120 121 public void addPath( Duct lhs, Duct rhs ) 122 { 123 addPath( lhs, 0, rhs ); 124 } 125 126 public void addPath( Duct lhs, int ordinal, Duct rhs ) 127 { 128 if( lhs == null && rhs == null ) 129 throw new IllegalArgumentException( "both lhs and rhs may not be null" ); 130 131 if( lhs == getTAIL() ) 132 throw new IllegalStateException( "lhs may not be a TAIL" ); 133 134 if( rhs == getHEAD() ) 135 throw new IllegalStateException( "rhs may not be a HEAD" ); 136 137 if( lhs == null ) 138 lhs = getHEAD(); 139 140 if( rhs == null ) 141 rhs = getTAIL(); 142 143 try 144 { 145 ductGraph.addVertex( lhs ); 146 ductGraph.addVertex( rhs ); 147 ductGraph.addEdge( lhs, rhs, ductGraph.makeOrdinal( ordinal ) ); 148 } 149 catch( RuntimeException exception ) 150 { 151 LOG.error( "unable to add path", exception ); 152 printGraphError(); 153 throw exception; 154 } 155 } 156 157 public void bind() 158 { 159 Iterator<Duct> iterator = getTopologicalOrderIterator(); 160 161 // build the actual processing graph 162 while( iterator.hasNext() ) 163 iterator.next().bind( this ); 164 165 iterator = getReversedTopologicalOrderIterator(); 166 167 // initialize all the ducts 168 while( iterator.hasNext() ) 169 iterator.next().initialize(); 170 } 171 172 /** Calls prepare starting at the tail and working backwards */ 173 public void prepare() 174 { 175 TopologicalOrderIterator<Duct, Integer> iterator = getReversedTopologicalOrderIterator(); 176 177 while( iterator.hasNext() ) 178 iterator.next().prepare(); 179 } 180 181 /** Calls cleanup starting at the head and working forwards */ 182 public void cleanup() 183 { 184 TopologicalOrderIterator<Duct, Integer> iterator = getTopologicalOrderIterator(); 185 186 while( iterator.hasNext() ) 187 iterator.next().cleanup(); 188 } 189 190 public Collection<Duct> getHeads() 191 { 192 return Graphs.successorListOf( ductGraph, getHEAD() ); 193 } 194 195 public Collection<Duct> getTails() 196 { 197 return Graphs.predecessorListOf( ductGraph, getTAIL() ); 198 } 199 200 public Duct[] findAllNextFor( Duct current ) 201 { 202 Set<DuctGraph.Ordinal> outgoing = ductGraph.outgoingEdgesOf( current ); 203 LinkedList<Duct> successors = new LinkedList<Duct>(); 204 205 for( DuctGraph.Ordinal edge : outgoing ) 206 { 207 Duct successor = ductGraph.getEdgeTarget( edge ); 208 209 if( successor == getHEAD() ) 210 throw new IllegalStateException( "HEAD may not be next" ); 211 212 if( successor == getTAIL() ) // tail is not included, its just a marker 213 continue; 214 215 successor = wrapWithOrdinal( edge, successor ); 216 217 successors.add( successor ); 218 } 219 220 return successors.toArray( new Duct[ successors.size() ] ); 221 } 222 223 public Duct[] findAllPreviousFor( Duct current ) 224 { 225 LinkedList<Duct> predecessors = new LinkedList<Duct>( Graphs.predecessorListOf( ductGraph, current ) ); 226 ListIterator<Duct> iterator = predecessors.listIterator(); 227 228 while( iterator.hasNext() ) 229 { 230 Duct successor = iterator.next(); 231 232 if( successor == getTAIL() ) 233 throw new IllegalStateException( "TAIL may not be successor" ); 234 235 if( successor == getHEAD() ) // head is not included, its just a marker 236 iterator.remove(); 237 } 238 239 return predecessors.toArray( new Duct[ predecessors.size() ] ); 240 } 241 242 public Duct createNextFor( Duct current ) 243 { 244 if( current == getHEAD() || current == getTAIL() ) 245 return null; 246 247 Set<DuctGraph.Ordinal> edges = ductGraph.outgoingEdgesOf( current ); 248 249 if( edges.size() == 0 ) 250 throw new IllegalStateException( "ducts must have an outgoing edge, current: " + current ); 251 252 DuctGraph.Ordinal edge = edges.iterator().next(); 253 Duct next = ductGraph.getEdgeTarget( edge ); 254 255 if( current instanceof Gate ) 256 { 257 if( !( current instanceof Window ) ) // just collapse - Merge 258 { 259 if( edges.size() > 1 ) 260 return createFork( findAllNextFor( current ) ); 261 262 return wrapWithOrdinal( edge, next ); 263 } 264 265 if( next instanceof OpenWindow ) 266 return next; 267 268 if( edges.size() > 1 ) 269 return createOpenWindow( createFork( findAllNextFor( current ) ) ); 270 271 if( next instanceof Reducing ) 272 return createOpenReducingWindow( next ); 273 274 return createOpenWindow( wrapWithOrdinal( edge, next ) ); 275 } 276 277 if( current instanceof Reducing ) 278 { 279 if( next instanceof Reducing ) 280 return next; 281 282 if( edges.size() > 1 ) 283 return createCloseWindow( createFork( findAllNextFor( current ) ) ); 284 285 return createCloseWindow( wrapWithOrdinal( edge, next ) ); 286 } 287 288 if( edges.size() > 1 ) 289 return createFork( findAllNextFor( current ) ); 290 291 if( next == getTAIL() ) // tail is not included, its just a marker 292 throw new IllegalStateException( "tail ducts should not bind to next" ); 293 294 return wrapWithOrdinal( edge, next ); 295 } 296 297 protected Duct wrapWithOrdinal( DuctGraph.Ordinal edge, Duct next ) 298 { 299 if( next instanceof Collapsing ) 300 next = new OrdinalDuct( next, edge.getOrdinal() ); 301 302 return next; 303 } 304 305 protected Duct createCloseWindow( Duct next ) 306 { 307 return new CloseReducingDuct( next ); 308 } 309 310 protected Duct createOpenWindow( Duct next ) 311 { 312 return new OpenDuct( next ); 313 } 314 315 protected Duct createOpenReducingWindow( Duct next ) 316 { 317 return new OpenReducingDuct( next ); 318 } 319 320 protected Duct createFork( Duct[] allNext ) 321 { 322 return new Fork( allNext ); 323 } 324 325 public int ordinalBetween( Duct lhs, Duct rhs ) 326 { 327 return ductGraph.getEdge( lhs, rhs ).getOrdinal(); 328 } 329 330 public TopologicalOrderIterator<Duct, Integer> getTopologicalOrderIterator() 331 { 332 try 333 { 334 return new TopologicalOrderIterator( ductGraph ); 335 } 336 catch( RuntimeException exception ) 337 { 338 LOG.error( "failed creating topological iterator", exception ); 339 printGraphError(); 340 341 throw exception; 342 } 343 } 344 345 public TopologicalOrderIterator<Duct, Integer> getReversedTopologicalOrderIterator() 346 { 347 try 348 { 349 return new TopologicalOrderIterator( getReversedGraph() ); 350 } 351 catch( RuntimeException exception ) 352 { 353 LOG.error( "failed creating reversed topological iterator", exception ); 354 printGraphError(); 355 356 throw exception; 357 } 358 } 359 360 public DirectedGraph getReversedGraph() 361 { 362 DuctGraph reversedGraph = new DuctGraph(); 363 364 Graphs.addGraphReversed( reversedGraph, ductGraph ); 365 366 return reversedGraph; 367 } 368 369 public Collection<Duct> getAllDucts() 370 { 371 return ductGraph.vertexSet(); 372 } 373 374 public void printGraphError() 375 { 376 String filename = (String) getProperty( ERROR_DOT_FILE_NAME ); 377 378 if( filename == null ) 379 return; 380 381 printGraph( filename ); 382 } 383 384 public void printGraph( String id, String classifier, int discriminator ) 385 { 386 String path = (String) getProperty( DOT_FILE_PATH ); 387 388 if( path == null ) 389 return; 390 391 classifier = Util.cleansePathName( classifier ); 392 393 path = String.format( "%s/streamgraph-%s-%s-%s.dot", path, id, classifier, discriminator ); 394 395 printGraph( path ); 396 } 397 398 public void printGraph( String filename ) 399 { 400 LOG.info( "writing stream graph to {}", filename ); 401 Util.printGraph( filename, ductGraph ); 402 } 403 404 public void printBoundGraph( String id, String classifier, int discriminator ) 405 { 406 String path = (String) getProperty( DOT_FILE_PATH ); 407 408 if( path == null ) 409 return; 410 411 classifier = Util.cleansePathName( classifier ); 412 413 path = String.format( "%s/streamgraph-bound-%s-%s-%s.dot", path, id, classifier, discriminator ); 414 415 printBoundGraph( path ); 416 } 417 418 public void printBoundGraph( String filename ) 419 { 420 LOG.info( "writing stream bound graph to {}", filename ); 421 422 DirectedMultigraph<Duct, Integer> graph = new DirectedMultigraph<>( new EdgeFactory<Duct, Integer>() 423 { 424 int count = 0; 425 426 @Override 427 public Integer createEdge( Duct sourceVertex, Duct targetVertex ) 428 { 429 return count++; 430 } 431 } ); 432 433 TopologicalOrderIterator<Duct, Integer> iterator = getTopologicalOrderIterator(); 434 435 while( iterator.hasNext() ) 436 { 437 Duct previous = iterator.next(); 438 439 if( graph.containsVertex( previous ) || previous instanceof Extent ) 440 continue; 441 442 graph.addVertex( previous ); 443 addNext( graph, previous ); 444 } 445 446 Util.printGraph( filename, graph ); 447 } 448 449 private void addNext( DirectedMultigraph graph, Duct previous ) 450 { 451 452 if( previous instanceof Fork ) 453 { 454 for( Duct next : ( (Fork) previous ).getAllNext() ) 455 { 456 if( next == null || next instanceof Extent ) 457 continue; 458 459 graph.addVertex( next ); 460 461 if( graph.containsEdge( previous, next ) ) 462 continue; 463 464 graph.addEdge( previous, next ); 465 466 addNext( graph, next ); 467 } 468 } 469 else 470 { 471 Duct next = previous.getNext(); 472 473 if( next == null || next instanceof Extent ) 474 return; 475 476 graph.addVertex( next ); 477 478 if( graph.containsEdge( previous, next ) ) 479 return; 480 481 graph.addEdge( previous, next ); 482 483 addNext( graph, next ); 484 } 485 } 486 }