001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.graph;
023
024import java.util.Collection;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.ListIterator;
028import java.util.Set;
029
030import cascading.flow.stream.duct.CloseReducingDuct;
031import cascading.flow.stream.duct.Collapsing;
032import cascading.flow.stream.duct.Duct;
033import cascading.flow.stream.duct.DuctGraph;
034import cascading.flow.stream.duct.Fork;
035import cascading.flow.stream.duct.Gate;
036import cascading.flow.stream.duct.OpenDuct;
037import cascading.flow.stream.duct.OpenReducingDuct;
038import cascading.flow.stream.duct.OpenWindow;
039import cascading.flow.stream.duct.OrdinalDuct;
040import cascading.flow.stream.duct.Reducing;
041import cascading.flow.stream.duct.Stage;
042import cascading.flow.stream.duct.Window;
043import cascading.util.Util;
044import org.jgrapht.DirectedGraph;
045import org.jgrapht.EdgeFactory;
046import org.jgrapht.Graphs;
047import org.jgrapht.graph.DirectedMultigraph;
048import org.jgrapht.traverse.TopologicalOrderIterator;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Class StreamGraph is the operation pipeline used during processing. This an internal use only class.
054 * <p/>
055 * Under some circumstances it may make sense to see the actual graph plan. To do so, enable one or both dot file
056 * properties, {@link #ERROR_DOT_FILE_NAME} and {@link #DOT_FILE_PATH}.
057 */
058public class StreamGraph
059  {
060  /** Property denoting the path and filename to write the failed stream graph dot file. */
061  public final static String ERROR_DOT_FILE_NAME = "cascading.stream.error.dotfile";
062
063  /**
064   * Property denoting the path to write all stream graph dot files. The filename will be generated
065   * based on platform properties.
066   */
067  public final static String DOT_FILE_PATH = "cascading.stream.dotfile.path";
068
069  private static final Logger LOG = LoggerFactory.getLogger( StreamGraph.class );
070
071  private final Duct HEAD = new Extent( "head" );
072  private final Duct TAIL = new Extent( "tail" );
073
074  private final DuctGraph ductGraph = new DuctGraph();
075
076  private class Extent extends Stage
077    {
078    final String name;
079
080    private Extent( String name )
081      {
082      this.name = name;
083      }
084
085    @Override
086    public String toString()
087      {
088      return name;
089      }
090    }
091
092  public StreamGraph()
093    {
094    }
095
096  protected Object getProperty( String name )
097    {
098    return null;
099    }
100
101  Duct getHEAD()
102    {
103    return HEAD;
104    }
105
106  Duct getTAIL()
107    {
108    return TAIL;
109    }
110
111  public void addHead( Duct head )
112    {
113    addPath( getHEAD(), head );
114    }
115
116  public void addTail( Duct tail )
117    {
118    addPath( tail, getTAIL() );
119    }
120
121  public void addPath( Duct lhs, Duct rhs )
122    {
123    addPath( lhs, 0, rhs );
124    }
125
126  public void addPath( Duct lhs, int ordinal, Duct rhs )
127    {
128    if( lhs == null && rhs == null )
129      throw new IllegalArgumentException( "both lhs and rhs may not be null" );
130
131    if( lhs == getTAIL() )
132      throw new IllegalStateException( "lhs may not be a TAIL" );
133
134    if( rhs == getHEAD() )
135      throw new IllegalStateException( "rhs may not be a HEAD" );
136
137    if( lhs == null )
138      lhs = getHEAD();
139
140    if( rhs == null )
141      rhs = getTAIL();
142
143    try
144      {
145      ductGraph.addVertex( lhs );
146      ductGraph.addVertex( rhs );
147      ductGraph.addEdge( lhs, rhs, ductGraph.makeOrdinal( ordinal ) );
148      }
149    catch( RuntimeException exception )
150      {
151      LOG.error( "unable to add path", exception );
152      printGraphError();
153      throw exception;
154      }
155    }
156
157  public void bind()
158    {
159    Iterator<Duct> iterator = getTopologicalOrderIterator();
160
161    // build the actual processing graph
162    while( iterator.hasNext() )
163      iterator.next().bind( this );
164
165    iterator = getReversedTopologicalOrderIterator();
166
167    // initialize all the ducts
168    while( iterator.hasNext() )
169      iterator.next().initialize();
170    }
171
172  /** Calls prepare starting at the tail and working backwards */
173  public void prepare()
174    {
175    TopologicalOrderIterator<Duct, Integer> iterator = getReversedTopologicalOrderIterator();
176
177    while( iterator.hasNext() )
178      iterator.next().prepare();
179    }
180
181  /** Calls cleanup starting at the head and working forwards */
182  public void cleanup()
183    {
184    TopologicalOrderIterator<Duct, Integer> iterator = getTopologicalOrderIterator();
185
186    while( iterator.hasNext() )
187      iterator.next().cleanup();
188    }
189
190  public Collection<Duct> getHeads()
191    {
192    return Graphs.successorListOf( ductGraph, getHEAD() );
193    }
194
195  public Collection<Duct> getTails()
196    {
197    return Graphs.predecessorListOf( ductGraph, getTAIL() );
198    }
199
200  public Duct[] findAllNextFor( Duct current )
201    {
202    Set<DuctGraph.Ordinal> outgoing = ductGraph.outgoingEdgesOf( current );
203    LinkedList<Duct> successors = new LinkedList<Duct>();
204
205    for( DuctGraph.Ordinal edge : outgoing )
206      {
207      Duct successor = ductGraph.getEdgeTarget( edge );
208
209      if( successor == getHEAD() )
210        throw new IllegalStateException( "HEAD may not be next" );
211
212      if( successor == getTAIL() ) // tail is not included, its just a marker
213        continue;
214
215      successor = wrapWithOrdinal( edge, successor );
216
217      successors.add( successor );
218      }
219
220    return successors.toArray( new Duct[ successors.size() ] );
221    }
222
223  public Duct[] findAllPreviousFor( Duct current )
224    {
225    LinkedList<Duct> predecessors = new LinkedList<Duct>( Graphs.predecessorListOf( ductGraph, current ) );
226    ListIterator<Duct> iterator = predecessors.listIterator();
227
228    while( iterator.hasNext() )
229      {
230      Duct successor = iterator.next();
231
232      if( successor == getTAIL() )
233        throw new IllegalStateException( "TAIL may not be successor" );
234
235      if( successor == getHEAD() ) // head is not included, its just a marker
236        iterator.remove();
237      }
238
239    return predecessors.toArray( new Duct[ predecessors.size() ] );
240    }
241
242  public Duct createNextFor( Duct current )
243    {
244    if( current == getHEAD() || current == getTAIL() )
245      return null;
246
247    Set<DuctGraph.Ordinal> edges = ductGraph.outgoingEdgesOf( current );
248
249    if( edges.size() == 0 )
250      throw new IllegalStateException( "ducts must have an outgoing edge, current: " + current );
251
252    DuctGraph.Ordinal edge = edges.iterator().next();
253    Duct next = ductGraph.getEdgeTarget( edge );
254
255    if( current instanceof Gate )
256      {
257      if( !( current instanceof Window ) ) // just collapse - Merge
258        {
259        if( edges.size() > 1 )
260          return createFork( findAllNextFor( current ) );
261
262        return wrapWithOrdinal( edge, next );
263        }
264
265      if( next instanceof OpenWindow )
266        return next;
267
268      if( edges.size() > 1 )
269        return createOpenWindow( createFork( findAllNextFor( current ) ) );
270
271      if( next instanceof Reducing )
272        return createOpenReducingWindow( next );
273
274      return createOpenWindow( wrapWithOrdinal( edge, next ) );
275      }
276
277    if( current instanceof Reducing )
278      {
279      if( next instanceof Reducing )
280        return next;
281
282      if( edges.size() > 1 )
283        return createCloseWindow( createFork( findAllNextFor( current ) ) );
284
285      return createCloseWindow( wrapWithOrdinal( edge, next ) );
286      }
287
288    if( edges.size() > 1 )
289      return createFork( findAllNextFor( current ) );
290
291    if( next == getTAIL() ) // tail is not included, its just a marker
292      throw new IllegalStateException( "tail ducts should not bind to next" );
293
294    return wrapWithOrdinal( edge, next );
295    }
296
297  protected Duct wrapWithOrdinal( DuctGraph.Ordinal edge, Duct next )
298    {
299    if( next instanceof Collapsing )
300      next = new OrdinalDuct( next, edge.getOrdinal() );
301
302    return next;
303    }
304
305  protected Duct createCloseWindow( Duct next )
306    {
307    return new CloseReducingDuct( next );
308    }
309
310  protected Duct createOpenWindow( Duct next )
311    {
312    return new OpenDuct( next );
313    }
314
315  protected Duct createOpenReducingWindow( Duct next )
316    {
317    return new OpenReducingDuct( next );
318    }
319
320  protected Duct createFork( Duct[] allNext )
321    {
322    return new Fork( allNext );
323    }
324
325  public int ordinalBetween( Duct lhs, Duct rhs )
326    {
327    return ductGraph.getEdge( lhs, rhs ).getOrdinal();
328    }
329
330  public TopologicalOrderIterator<Duct, Integer> getTopologicalOrderIterator()
331    {
332    try
333      {
334      return new TopologicalOrderIterator( ductGraph );
335      }
336    catch( RuntimeException exception )
337      {
338      LOG.error( "failed creating topological iterator", exception );
339      printGraphError();
340
341      throw exception;
342      }
343    }
344
345  public TopologicalOrderIterator<Duct, Integer> getReversedTopologicalOrderIterator()
346    {
347    try
348      {
349      return new TopologicalOrderIterator( getReversedGraph() );
350      }
351    catch( RuntimeException exception )
352      {
353      LOG.error( "failed creating reversed topological iterator", exception );
354      printGraphError();
355
356      throw exception;
357      }
358    }
359
360  public DirectedGraph getReversedGraph()
361    {
362    DuctGraph reversedGraph = new DuctGraph();
363
364    Graphs.addGraphReversed( reversedGraph, ductGraph );
365
366    return reversedGraph;
367    }
368
369  public Collection<Duct> getAllDucts()
370    {
371    return ductGraph.vertexSet();
372    }
373
374  public void printGraphError()
375    {
376    String filename = (String) getProperty( ERROR_DOT_FILE_NAME );
377
378    if( filename == null )
379      return;
380
381    printGraph( filename );
382    }
383
384  public void printGraph( String id, String classifier, int discriminator )
385    {
386    String path = (String) getProperty( DOT_FILE_PATH );
387
388    if( path == null )
389      return;
390
391    classifier = Util.cleansePathName( classifier );
392
393    path = String.format( "%s/streamgraph-%s-%s-%s.dot", path, id, classifier, discriminator );
394
395    printGraph( path );
396    }
397
398  public void printGraph( String filename )
399    {
400    LOG.info( "writing stream graph to {}", filename );
401    Util.printGraph( filename, ductGraph );
402    }
403
404  public void printBoundGraph( String id, String classifier, int discriminator )
405    {
406    String path = (String) getProperty( DOT_FILE_PATH );
407
408    if( path == null )
409      return;
410
411    classifier = Util.cleansePathName( classifier );
412
413    path = String.format( "%s/streamgraph-bound-%s-%s-%s.dot", path, id, classifier, discriminator );
414
415    printBoundGraph( path );
416    }
417
418  public void printBoundGraph( String filename )
419    {
420    LOG.info( "writing stream bound graph to {}", filename );
421
422    DirectedMultigraph<Duct, Integer> graph = new DirectedMultigraph<>( new EdgeFactory<Duct, Integer>()
423      {
424      int count = 0;
425
426      @Override
427      public Integer createEdge( Duct sourceVertex, Duct targetVertex )
428        {
429        return count++;
430        }
431      } );
432
433    TopologicalOrderIterator<Duct, Integer> iterator = getTopologicalOrderIterator();
434
435    while( iterator.hasNext() )
436      {
437      Duct previous = iterator.next();
438
439      if( graph.containsVertex( previous ) || previous instanceof Extent )
440        continue;
441
442      graph.addVertex( previous );
443      addNext( graph, previous );
444      }
445
446    Util.printGraph( filename, graph );
447    }
448
449  private void addNext( DirectedMultigraph graph, Duct previous )
450    {
451
452    if( previous instanceof Fork )
453      {
454      for( Duct next : ( (Fork) previous ).getAllNext() )
455        {
456        if( next == null || next instanceof Extent )
457          continue;
458
459        graph.addVertex( next );
460
461        if( graph.containsEdge( previous, next ) )
462          continue;
463
464        graph.addEdge( previous, next );
465
466        addNext( graph, next );
467        }
468      }
469    else
470      {
471      Duct next = previous.getNext();
472
473      if( next == null || next instanceof Extent )
474        return;
475
476      graph.addVertex( next );
477
478      if( graph.containsEdge( previous, next ) )
479        return;
480
481      graph.addEdge( previous, next );
482
483      addNext( graph, next );
484      }
485    }
486  }