001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.process;
023
024import java.io.FileWriter;
025import java.io.IOException;
026import java.io.Writer;
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Comparator;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.PriorityQueue;
036import java.util.Set;
037
038import cascading.flow.FlowElement;
039import cascading.flow.planner.Scope;
040import cascading.flow.planner.graph.AnnotatedGraph;
041import cascading.flow.planner.graph.ElementGraph;
042import cascading.flow.planner.graph.ElementGraphs;
043import cascading.flow.planner.graph.Extent;
044import cascading.pipe.Group;
045import cascading.tap.Tap;
046import cascading.util.EnumMultiMap;
047import cascading.util.Util;
048import cascading.util.jgrapht.IntegerNameProvider;
049import cascading.util.jgrapht.VertexNameProvider;
050import org.jgrapht.graph.SimpleDirectedGraph;
051import org.jgrapht.traverse.TopologicalOrderIterator;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import static cascading.util.Util.createIdentitySet;
056
057/**
058 *
059 */
060public abstract class BaseProcessGraph<Process extends ProcessModel> implements ProcessGraph<Process>
061  {
062  /** Field LOG */
063  private static final Logger LOG = LoggerFactory.getLogger( BaseProcessGraph.class );
064
065  final SimpleDirectedGraph<Process, ProcessEdge> graph;
066
067  protected Set<FlowElement> sourceElements = createIdentitySet();
068  protected Set<FlowElement> sinkElements = createIdentitySet();
069  private Set<Tap> sourceTaps;
070  private Set<Tap> sinkTaps;
071  protected Map<String, Tap> trapsMap = new HashMap<>();
072
073  public BaseProcessGraph()
074    {
075    graph = new SimpleDirectedGraph( ProcessEdge.class );
076    }
077
078  @Override
079  public boolean addVertex( Process process )
080    {
081    sourceElements.addAll( process.getSourceElements() );
082    sinkElements.addAll( process.getSinkElements() );
083    trapsMap.putAll( process.getTrapMap() );
084
085    return graph.addVertex( process );
086    }
087
088  public void bindEdges()
089    {
090    for( Process sinkProcess : vertexSet() )
091      {
092      for( Process sourceProcess : vertexSet() )
093        {
094        if( sourceProcess == sinkProcess )
095          continue;
096
097        // outer edge sources and sinks to this graph
098        sourceElements.removeAll( sinkProcess.getSinkElements() );
099        sinkElements.removeAll( sourceProcess.getSourceElements() );
100        }
101      }
102
103    for( Process sinkProcess : vertexSet() )
104      {
105      for( Process sourceProcess : vertexSet() )
106        {
107        if( sourceProcess == sinkProcess )
108          continue;
109
110        for( FlowElement intermediate : sourceProcess.getSinkElements() )
111          {
112          if( sinkProcess.getSourceElements().contains( intermediate ) )
113            addEdge( sourceProcess, sinkProcess, new ProcessEdge<>( sourceProcess, intermediate, sinkProcess ) );
114          }
115        }
116      }
117
118    // clear tap map caches
119    sourceTaps = null;
120    sinkTaps = null;
121    }
122
123  @Override
124  public Set<FlowElement> getSourceElements()
125    {
126    return sourceElements;
127    }
128
129  @Override
130  public Set<FlowElement> getSinkElements()
131    {
132    return sinkElements;
133    }
134
135  @Override
136  public Set<Tap> getSourceTaps()
137    {
138    if( sourceTaps != null )
139      return sourceTaps;
140
141    sourceTaps = Util.narrowIdentitySet( Tap.class, getSourceElements() );
142
143    return sourceTaps;
144    }
145
146  @Override
147  public Map<String, Tap> getSourceTapsMap()
148    {
149    Map<String, Tap> result = new HashMap<>();
150    Set<Tap> sourceTaps = getSourceTaps();
151
152    for( Tap sourceTap : sourceTaps )
153      {
154      for( Process process : graph.vertexSet() )
155        {
156        if( !process.getSourceTaps().contains( sourceTap ) )
157          continue;
158
159        ElementGraph elementGraph = process.getElementGraph();
160
161        for( Scope scope : elementGraph.outgoingEdgesOf( sourceTap ) )
162          result.put( scope.getName(), sourceTap );
163        }
164      }
165
166    return result;
167    }
168
169  @Override
170  public Set<Tap> getSinkTaps()
171    {
172    if( sinkTaps != null )
173      return sinkTaps;
174
175    sinkTaps = Util.narrowIdentitySet( Tap.class, getSinkElements() );
176
177    return sinkTaps;
178    }
179
180  @Override
181  public Map<String, Tap> getSinkTapsMap()
182    {
183    Map<String, Tap> result = new HashMap<>();
184    Set<Tap> sinkTaps = getSinkTaps();
185
186    for( Tap sinkTap : sinkTaps )
187      {
188      for( Process process : graph.vertexSet() )
189        {
190        if( !process.getSinkTaps().contains( sinkTap ) )
191          continue;
192
193        ElementGraph elementGraph = process.getElementGraph();
194
195        for( Scope scope : elementGraph.incomingEdgesOf( sinkTap ) )
196          result.put( scope.getName(), sinkTap );
197        }
198      }
199
200    return result;
201    }
202
203  @Override
204  public Map<String, Tap> getTrapsMap()
205    {
206    return trapsMap;
207    }
208
209  @Override
210  public Iterator<Process> getTopologicalIterator()
211    {
212    return getOrderedTopologicalIterator( new Comparator<Process>()
213      {
214      @Override
215      public int compare( Process lhs, Process rhs )
216        {
217        return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() );
218        }
219      } );
220    }
221
222  @Override
223  public Iterator<Process> getOrdinalTopologicalIterator()
224    {
225    return getOrderedTopologicalIterator( new Comparator<Process>()
226      {
227      @Override
228      public int compare( Process lhs, Process rhs )
229        {
230        return Integer.valueOf( lhs.getOrdinal() ).compareTo( rhs.getOrdinal() );
231        }
232      } );
233    }
234
235  @Override
236  public Iterator<Process> getOrderedTopologicalIterator( Comparator<Process> comparator )
237    {
238    return new TopologicalOrderIterator<>( graph, new PriorityQueue<>( 10, comparator ) );
239    }
240
241  @Override
242  public Set<ElementGraph> getElementGraphs()
243    {
244    Set<ElementGraph> results = createIdentitySet();
245
246    for( Process process : vertexSet() )
247      results.add( process.getElementGraph() );
248
249    return results;
250    }
251
252  @Override
253  public List<ElementGraph> getElementGraphs( FlowElement flowElement )
254    {
255    List<Process> elementProcesses = getElementProcesses( flowElement );
256
257    List<ElementGraph> elementGraphs = new ArrayList<>();
258
259    for( Process elementProcess : elementProcesses )
260      elementGraphs.add( elementProcess.getElementGraph() );
261
262    return elementGraphs;
263    }
264
265  @Override
266  public List<Process> getElementProcesses( FlowElement flowElement )
267    {
268    List<Process> processes = new ArrayList<>();
269
270    for( Process process : vertexSet() )
271      {
272      if( process.getElementGraph().vertexSet().contains( flowElement ) )
273        processes.add( process );
274      }
275
276    return processes;
277    }
278
279  @Override
280  public List<ElementGraph> getElementGraphs( Scope scope )
281    {
282    List<Process> elementProcesses = getElementProcesses( scope );
283
284    List<ElementGraph> elementGraphs = new ArrayList<>();
285
286    for( Process elementProcess : elementProcesses )
287      elementGraphs.add( elementProcess.getElementGraph() );
288
289    return elementGraphs;
290    }
291
292  @Override
293  public List<Process> getElementProcesses( Scope scope )
294    {
295    List<Process> processes = new ArrayList<>();
296
297    for( Process process : vertexSet() )
298      {
299      if( process.getElementGraph().edgeSet().contains( scope ) )
300        processes.add( process );
301      }
302
303    return processes;
304    }
305
306  @Override
307  public List<Process> getElementSourceProcesses( FlowElement flowElement )
308    {
309    List<Process> sources = new ArrayList<>();
310
311    for( Process process : vertexSet() )
312      {
313      if( process.getSinkElements().contains( flowElement ) )
314        sources.add( process );
315      }
316
317    return sources;
318    }
319
320  @Override
321  public List<Process> getElementSinkProcesses( FlowElement flowElement )
322    {
323    List<Process> sinks = new ArrayList<>();
324
325    for( Process process : vertexSet() )
326      {
327      if( process.getSourceElements().contains( flowElement ) )
328        sinks.add( process );
329      }
330
331    return sinks;
332    }
333
334  @Override
335  public Set<FlowElement> getAllSourceElements()
336    {
337    Set<FlowElement> results = createIdentitySet();
338
339    for( Process process : vertexSet() )
340      results.addAll( process.getSourceElements() );
341
342    return results;
343    }
344
345  @Override
346  public Set<FlowElement> getAllSinkElements()
347    {
348    Set<FlowElement> results = createIdentitySet();
349
350    for( Process process : vertexSet() )
351      results.addAll( process.getSinkElements() );
352
353    return results;
354    }
355
356  public EnumMultiMap<FlowElement> getAnnotations()
357    {
358    EnumMultiMap<FlowElement> annotations = new EnumMultiMap<>();
359
360    for( Process process : vertexSet() )
361      {
362      ElementGraph elementGraph = process.getElementGraph();
363
364      if( elementGraph instanceof AnnotatedGraph )
365        annotations.addAll( ( (AnnotatedGraph) elementGraph ).getAnnotations() );
366      }
367
368    return annotations;
369    }
370
371  /**
372   * All elements, from the given ElementGraph, that belong to two or more processes, that are not sink or source elements that
373   * connect processes.
374   *
375   * @return Set
376   */
377  @Override
378  public Set<FlowElement> getDuplicatedElements( ElementGraph elementGraph )
379    {
380    Set<FlowElement> results = createIdentitySet();
381
382    for( FlowElement flowElement : elementGraph.vertexSet() )
383      {
384      if( getElementProcesses( flowElement ).size() > 1 )
385        results.add( flowElement );
386      }
387
388    results.remove( Extent.head );
389    results.remove( Extent.tail );
390    results.removeAll( getAllSourceElements() );
391    results.removeAll( getAllSinkElements() );
392
393    return results;
394    }
395
396  @Override
397  public Set<ElementGraph> getIdentityElementGraphs()
398    {
399    Set<ElementGraph> results = createIdentitySet();
400
401    for( Process process : getIdentityProcesses() )
402      results.add( process.getElementGraph() );
403
404    return results;
405    }
406
407  /**
408   * Returns a set of processes that perform no internal operations.
409   * <p>
410   * for example if a FlowNode only has a Merge source and a GroupBy sink.
411   *
412   * @return
413   */
414  @Override
415  public Set<Process> getIdentityProcesses()
416    {
417    Set<Process> results = new HashSet<>();
418
419    for( Process process : vertexSet() )
420      {
421      if( ProcessModels.isIdentity( process ) )
422        results.add( process );
423      }
424
425    return results;
426    }
427
428  /**
429   * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
430   *
431   * @param filename of type String
432   */
433  @Override
434  public void writeDOT( String filename )
435    {
436    printProcessGraph( filename );
437    }
438
439  protected void printProcessGraph( String filename )
440    {
441    try
442      {
443      Writer writer = new FileWriter( filename );
444
445      Util.writeDOT( writer, graph, new IntegerNameProvider<Process>(), new VertexNameProvider<Process>()
446        {
447        public String getVertexName( Process process )
448          {
449          String name = "[" + process.getName() + "]";
450
451          String sourceName = "";
452          Set<Tap> sources = process.getSourceTaps();
453          for( Tap source : sources )
454            sourceName += "\\nsrc:[" + source.getIdentifier() + "]";
455
456          if( sourceName.length() != 0 )
457            name += sourceName;
458
459          Collection<Group> groups = process.getGroups();
460
461          for( Group group : groups )
462            {
463            String groupName = group.getName();
464
465            if( groupName.length() != 0 )
466              name += "\\ngrp:" + groupName;
467            }
468
469          Set<Tap> sinks = process.getSinkTaps();
470          String sinkName = "";
471          for( Tap sink : sinks )
472            sinkName = "\\nsnk:[" + sink.getIdentifier() + "]";
473
474          if( sinkName.length() != 0 )
475            name += sinkName;
476
477          return name.replaceAll( "\"", "\'" );
478          }
479        }, null );
480
481      writer.close();
482      }
483    catch( IOException exception )
484      {
485      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
486      }
487    }
488
489  @Override
490  public void writeDOTNested( String filename, ElementGraph graph )
491    {
492    ElementGraphs.printProcessGraph( filename, graph, this );
493    }
494
495  public boolean containsEdge( Process sourceVertex, Process targetVertex )
496    {
497    return graph.containsEdge( sourceVertex, targetVertex );
498    }
499
500  public boolean removeAllEdges( Collection<? extends ProcessEdge> edges )
501    {
502    return graph.removeAllEdges( edges );
503    }
504
505  public Set<ProcessEdge> removeAllEdges( Process sourceVertex, Process targetVertex )
506    {
507    return graph.removeAllEdges( sourceVertex, targetVertex );
508    }
509
510  public boolean removeAllVertices( Collection<? extends Process> vertices )
511    {
512    return graph.removeAllVertices( vertices );
513    }
514
515  public Set<ProcessEdge> getAllEdges( Process sourceVertex, Process targetVertex )
516    {
517    return graph.getAllEdges( sourceVertex, targetVertex );
518    }
519
520  public ProcessEdge getEdge( Process sourceVertex, Process targetVertex )
521    {
522    return graph.getEdge( sourceVertex, targetVertex );
523    }
524
525  public ProcessEdge addEdge( Process sourceVertex, Process targetVertex )
526    {
527    return graph.addEdge( sourceVertex, targetVertex );
528    }
529
530  public boolean addEdge( Process sourceVertex, Process targetVertex, ProcessEdge processEdge )
531    {
532    return graph.addEdge( sourceVertex, targetVertex, processEdge );
533    }
534
535  public Process getEdgeSource( ProcessEdge processEdge )
536    {
537    return graph.getEdgeSource( processEdge );
538    }
539
540  public Process getEdgeTarget( ProcessEdge processEdge )
541    {
542    return graph.getEdgeTarget( processEdge );
543    }
544
545  public boolean containsEdge( ProcessEdge processEdge )
546    {
547    return graph.containsEdge( processEdge );
548    }
549
550  public boolean containsVertex( Process process )
551    {
552    return graph.containsVertex( process );
553    }
554
555  public Set<ProcessEdge> edgeSet()
556    {
557    return graph.edgeSet();
558    }
559
560  public Set<ProcessEdge> edgesOf( Process vertex )
561    {
562    return graph.edgesOf( vertex );
563    }
564
565  public int inDegreeOf( Process vertex )
566    {
567    return graph.inDegreeOf( vertex );
568    }
569
570  public Set<ProcessEdge> incomingEdgesOf( Process vertex )
571    {
572    return graph.incomingEdgesOf( vertex );
573    }
574
575  public int outDegreeOf( Process vertex )
576    {
577    return graph.outDegreeOf( vertex );
578    }
579
580  public Set<ProcessEdge> outgoingEdgesOf( Process vertex )
581    {
582    return graph.outgoingEdgesOf( vertex );
583    }
584
585  public ProcessEdge removeEdge( Process sourceVertex, Process targetVertex )
586    {
587    return graph.removeEdge( sourceVertex, targetVertex );
588    }
589
590  public boolean removeEdge( ProcessEdge processEdge )
591    {
592    return graph.removeEdge( processEdge );
593    }
594
595  public boolean removeVertex( Process process )
596    {
597    return graph.removeVertex( process );
598    }
599
600  public Set<Process> vertexSet()
601    {
602    return graph.vertexSet();
603    }
604
605  public double getEdgeWeight( ProcessEdge processEdge )
606    {
607    return graph.getEdgeWeight( processEdge );
608    }
609  }