001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.process;
022
023import java.io.FileWriter;
024import java.io.IOException;
025import java.io.Writer;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Comparator;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.PriorityQueue;
035import java.util.Set;
036
037import cascading.flow.FlowElement;
038import cascading.flow.planner.Scope;
039import cascading.flow.planner.graph.AnnotatedGraph;
040import cascading.flow.planner.graph.ElementGraph;
041import cascading.flow.planner.graph.ElementGraphs;
042import cascading.flow.planner.graph.Extent;
043import cascading.pipe.Group;
044import cascading.tap.Tap;
045import cascading.util.EnumMultiMap;
046import cascading.util.Util;
047import cascading.util.jgrapht.IntegerNameProvider;
048import cascading.util.jgrapht.VertexNameProvider;
049import org.jgrapht.graph.SimpleDirectedGraph;
050import org.jgrapht.traverse.TopologicalOrderIterator;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import static cascading.util.Util.createIdentitySet;
055
056/**
057 *
058 */
059public abstract class BaseProcessGraph<Process extends ProcessModel> implements ProcessGraph<Process>
060  {
061  /** Field LOG */
062  private static final Logger LOG = LoggerFactory.getLogger( BaseProcessGraph.class );
063
064  final SimpleDirectedGraph<Process, ProcessEdge> graph;
065
066  protected Set<FlowElement> sourceElements = createIdentitySet();
067  protected Set<FlowElement> sinkElements = createIdentitySet();
068  private Set<Tap> sourceTaps;
069  private Set<Tap> sinkTaps;
070  protected Map<String, Tap> trapsMap = new HashMap<>();
071
072  public BaseProcessGraph()
073    {
074    graph = new SimpleDirectedGraph( ProcessEdge.class );
075    }
076
077  @Override
078  public boolean addVertex( Process process )
079    {
080    sourceElements.addAll( process.getSourceElements() );
081    sinkElements.addAll( process.getSinkElements() );
082    trapsMap.putAll( process.getTrapMap() );
083
084    return graph.addVertex( process );
085    }
086
087  public void bindEdges()
088    {
089    for( Process sinkProcess : vertexSet() )
090      {
091      for( Process sourceProcess : vertexSet() )
092        {
093        if( sourceProcess == sinkProcess )
094          continue;
095
096        // outer edge sources and sinks to this graph
097        sourceElements.removeAll( sinkProcess.getSinkElements() );
098        sinkElements.removeAll( sourceProcess.getSourceElements() );
099        }
100      }
101
102    for( Process sinkProcess : vertexSet() )
103      {
104      for( Process sourceProcess : vertexSet() )
105        {
106        if( sourceProcess == sinkProcess )
107          continue;
108
109        for( FlowElement intermediate : sourceProcess.getSinkElements() )
110          {
111          if( sinkProcess.getSourceElements().contains( intermediate ) )
112            addEdge( sourceProcess, sinkProcess, new ProcessEdge<>( sourceProcess, intermediate, sinkProcess ) );
113          }
114        }
115      }
116
117    // clear tap map caches
118    sourceTaps = null;
119    sinkTaps = null;
120    }
121
122  @Override
123  public Set<FlowElement> getSourceElements()
124    {
125    return sourceElements;
126    }
127
128  @Override
129  public Set<FlowElement> getSinkElements()
130    {
131    return sinkElements;
132    }
133
134  @Override
135  public Set<Tap> getSourceTaps()
136    {
137    if( sourceTaps != null )
138      return sourceTaps;
139
140    sourceTaps = Util.narrowIdentitySet( Tap.class, getSourceElements() );
141
142    return sourceTaps;
143    }
144
145  @Override
146  public Map<String, Tap> getSourceTapsMap()
147    {
148    Map<String, Tap> result = new HashMap<>();
149    Set<Tap> sourceTaps = getSourceTaps();
150
151    for( Tap sourceTap : sourceTaps )
152      {
153      for( Process process : graph.vertexSet() )
154        {
155        if( !process.getSourceTaps().contains( sourceTap ) )
156          continue;
157
158        ElementGraph elementGraph = process.getElementGraph();
159
160        for( Scope scope : elementGraph.outgoingEdgesOf( sourceTap ) )
161          result.put( scope.getName(), sourceTap );
162        }
163      }
164
165    return result;
166    }
167
168  @Override
169  public Set<Tap> getSinkTaps()
170    {
171    if( sinkTaps != null )
172      return sinkTaps;
173
174    sinkTaps = Util.narrowIdentitySet( Tap.class, getSinkElements() );
175
176    return sinkTaps;
177    }
178
179  @Override
180  public Map<String, Tap> getSinkTapsMap()
181    {
182    Map<String, Tap> result = new HashMap<>();
183    Set<Tap> sinkTaps = getSinkTaps();
184
185    for( Tap sinkTap : sinkTaps )
186      {
187      for( Process process : graph.vertexSet() )
188        {
189        if( !process.getSinkTaps().contains( sinkTap ) )
190          continue;
191
192        ElementGraph elementGraph = process.getElementGraph();
193
194        for( Scope scope : elementGraph.incomingEdgesOf( sinkTap ) )
195          result.put( scope.getName(), sinkTap );
196        }
197      }
198
199    return result;
200    }
201
202  @Override
203  public Map<String, Tap> getTrapsMap()
204    {
205    return trapsMap;
206    }
207
208  @Override
209  public Iterator<Process> getTopologicalIterator()
210    {
211    return getOrderedTopologicalIterator( new Comparator<Process>()
212    {
213    @Override
214    public int compare( Process lhs, Process rhs )
215      {
216      return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() );
217      }
218    } );
219    }
220
221  @Override
222  public Iterator<Process> getOrdinalTopologicalIterator()
223    {
224    return getOrderedTopologicalIterator( new Comparator<Process>()
225    {
226    @Override
227    public int compare( Process lhs, Process rhs )
228      {
229      return Integer.valueOf( lhs.getOrdinal() ).compareTo( rhs.getOrdinal() );
230      }
231    } );
232    }
233
234  @Override
235  public Iterator<Process> getOrderedTopologicalIterator( Comparator<Process> comparator )
236    {
237    return new TopologicalOrderIterator<>( graph, new PriorityQueue<>( 10, comparator ) );
238    }
239
240  @Override
241  public Set<ElementGraph> getElementGraphs()
242    {
243    Set<ElementGraph> results = createIdentitySet();
244
245    for( Process process : vertexSet() )
246      results.add( process.getElementGraph() );
247
248    return results;
249    }
250
251  @Override
252  public List<ElementGraph> getElementGraphs( FlowElement flowElement )
253    {
254    List<Process> elementProcesses = getElementProcesses( flowElement );
255
256    List<ElementGraph> elementGraphs = new ArrayList<>();
257
258    for( Process elementProcess : elementProcesses )
259      elementGraphs.add( elementProcess.getElementGraph() );
260
261    return elementGraphs;
262    }
263
264  @Override
265  public List<Process> getElementProcesses( FlowElement flowElement )
266    {
267    List<Process> processes = new ArrayList<>();
268
269    for( Process process : vertexSet() )
270      {
271      if( process.getElementGraph().vertexSet().contains( flowElement ) )
272        processes.add( process );
273      }
274
275    return processes;
276    }
277
278  @Override
279  public List<ElementGraph> getElementGraphs( Scope scope )
280    {
281    List<Process> elementProcesses = getElementProcesses( scope );
282
283    List<ElementGraph> elementGraphs = new ArrayList<>();
284
285    for( Process elementProcess : elementProcesses )
286      elementGraphs.add( elementProcess.getElementGraph() );
287
288    return elementGraphs;
289    }
290
291  @Override
292  public List<Process> getElementProcesses( Scope scope )
293    {
294    List<Process> processes = new ArrayList<>();
295
296    for( Process process : vertexSet() )
297      {
298      if( process.getElementGraph().edgeSet().contains( scope ) )
299        processes.add( process );
300      }
301
302    return processes;
303    }
304
305  @Override
306  public List<Process> getElementSourceProcesses( FlowElement flowElement )
307    {
308    List<Process> sources = new ArrayList<>();
309
310    for( Process process : vertexSet() )
311      {
312      if( process.getSinkElements().contains( flowElement ) )
313        sources.add( process );
314      }
315
316    return sources;
317    }
318
319  @Override
320  public List<Process> getElementSinkProcesses( FlowElement flowElement )
321    {
322    List<Process> sinks = new ArrayList<>();
323
324    for( Process process : vertexSet() )
325      {
326      if( process.getSourceElements().contains( flowElement ) )
327        sinks.add( process );
328      }
329
330    return sinks;
331    }
332
333  @Override
334  public Set<FlowElement> getAllSourceElements()
335    {
336    Set<FlowElement> results = createIdentitySet();
337
338    for( Process process : vertexSet() )
339      results.addAll( process.getSourceElements() );
340
341    return results;
342    }
343
344  @Override
345  public Set<FlowElement> getAllSinkElements()
346    {
347    Set<FlowElement> results = createIdentitySet();
348
349    for( Process process : vertexSet() )
350      results.addAll( process.getSinkElements() );
351
352    return results;
353    }
354
355  public EnumMultiMap<FlowElement> getAnnotations()
356    {
357    EnumMultiMap<FlowElement> annotations = new EnumMultiMap<>();
358
359    for( Process process : vertexSet() )
360      {
361      ElementGraph elementGraph = process.getElementGraph();
362
363      if( elementGraph instanceof AnnotatedGraph )
364        annotations.addAll( ( (AnnotatedGraph) elementGraph ).getAnnotations() );
365      }
366
367    return annotations;
368    }
369
370  /**
371   * All elements, from the given ElementGraph, that belong to two or more processes, that are not sink or source elements that
372   * connect processes.
373   *
374   * @return Set
375   */
376  @Override
377  public Set<FlowElement> getDuplicatedElements( ElementGraph elementGraph )
378    {
379    Set<FlowElement> results = createIdentitySet();
380
381    for( FlowElement flowElement : elementGraph.vertexSet() )
382      {
383      if( getElementProcesses( flowElement ).size() > 1 )
384        results.add( flowElement );
385      }
386
387    results.remove( Extent.head );
388    results.remove( Extent.tail );
389    results.removeAll( getAllSourceElements() );
390    results.removeAll( getAllSinkElements() );
391
392    return results;
393    }
394
395  @Override
396  public Set<ElementGraph> getIdentityElementGraphs()
397    {
398    Set<ElementGraph> results = createIdentitySet();
399
400    for( Process process : getIdentityProcesses() )
401      results.add( process.getElementGraph() );
402
403    return results;
404    }
405
406  /**
407   * Returns a set of processes that perform no internal operations.
408   * <p/>
409   * for example if a FlowNode only has a Merge source and a GroupBy sink.
410   *
411   * @return
412   */
413  @Override
414  public Set<Process> getIdentityProcesses()
415    {
416    Set<Process> results = new HashSet<>();
417
418    for( Process process : vertexSet() )
419      {
420      if( ProcessModels.isIdentity( process ) )
421        results.add( process );
422      }
423
424    return results;
425    }
426
427  /**
428   * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
429   *
430   * @param filename of type String
431   */
432  @Override
433  public void writeDOT( String filename )
434    {
435    printProcessGraph( filename );
436    }
437
438  protected void printProcessGraph( String filename )
439    {
440    try
441      {
442      Writer writer = new FileWriter( filename );
443
444      Util.writeDOT( writer, graph, new IntegerNameProvider<Process>(), new VertexNameProvider<Process>()
445      {
446      public String getVertexName( Process process )
447        {
448        String name = "[" + process.getName() + "]";
449
450        String sourceName = "";
451        Set<Tap> sources = process.getSourceTaps();
452        for( Tap source : sources )
453          sourceName += "\\nsrc:[" + source.getIdentifier() + "]";
454
455        if( sourceName.length() != 0 )
456          name += sourceName;
457
458        Collection<Group> groups = process.getGroups();
459
460        for( Group group : groups )
461          {
462          String groupName = group.getName();
463
464          if( groupName.length() != 0 )
465            name += "\\ngrp:" + groupName;
466          }
467
468        Set<Tap> sinks = process.getSinkTaps();
469        String sinkName = "";
470        for( Tap sink : sinks )
471          sinkName = "\\nsnk:[" + sink.getIdentifier() + "]";
472
473        if( sinkName.length() != 0 )
474          name += sinkName;
475
476        return name.replaceAll( "\"", "\'" );
477        }
478      }, null );
479
480      writer.close();
481      }
482    catch( IOException exception )
483      {
484      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
485      }
486    }
487
488  @Override
489  public void writeDOTNested( String filename, ElementGraph graph )
490    {
491    ElementGraphs.printProcessGraph( filename, graph, this );
492    }
493
494  public boolean containsEdge( Process sourceVertex, Process targetVertex )
495    {
496    return graph.containsEdge( sourceVertex, targetVertex );
497    }
498
499  public boolean removeAllEdges( Collection<? extends ProcessEdge> edges )
500    {
501    return graph.removeAllEdges( edges );
502    }
503
504  public Set<ProcessEdge> removeAllEdges( Process sourceVertex, Process targetVertex )
505    {
506    return graph.removeAllEdges( sourceVertex, targetVertex );
507    }
508
509  public boolean removeAllVertices( Collection<? extends Process> vertices )
510    {
511    return graph.removeAllVertices( vertices );
512    }
513
514  public Set<ProcessEdge> getAllEdges( Process sourceVertex, Process targetVertex )
515    {
516    return graph.getAllEdges( sourceVertex, targetVertex );
517    }
518
519  public ProcessEdge getEdge( Process sourceVertex, Process targetVertex )
520    {
521    return graph.getEdge( sourceVertex, targetVertex );
522    }
523
524  public ProcessEdge addEdge( Process sourceVertex, Process targetVertex )
525    {
526    return graph.addEdge( sourceVertex, targetVertex );
527    }
528
529  public boolean addEdge( Process sourceVertex, Process targetVertex, ProcessEdge processEdge )
530    {
531    return graph.addEdge( sourceVertex, targetVertex, processEdge );
532    }
533
534  public Process getEdgeSource( ProcessEdge processEdge )
535    {
536    return graph.getEdgeSource( processEdge );
537    }
538
539  public Process getEdgeTarget( ProcessEdge processEdge )
540    {
541    return graph.getEdgeTarget( processEdge );
542    }
543
544  public boolean containsEdge( ProcessEdge processEdge )
545    {
546    return graph.containsEdge( processEdge );
547    }
548
549  public boolean containsVertex( Process process )
550    {
551    return graph.containsVertex( process );
552    }
553
554  public Set<ProcessEdge> edgeSet()
555    {
556    return graph.edgeSet();
557    }
558
559  public Set<ProcessEdge> edgesOf( Process vertex )
560    {
561    return graph.edgesOf( vertex );
562    }
563
564  public int inDegreeOf( Process vertex )
565    {
566    return graph.inDegreeOf( vertex );
567    }
568
569  public Set<ProcessEdge> incomingEdgesOf( Process vertex )
570    {
571    return graph.incomingEdgesOf( vertex );
572    }
573
574  public int outDegreeOf( Process vertex )
575    {
576    return graph.outDegreeOf( vertex );
577    }
578
579  public Set<ProcessEdge> outgoingEdgesOf( Process vertex )
580    {
581    return graph.outgoingEdgesOf( vertex );
582    }
583
584  public ProcessEdge removeEdge( Process sourceVertex, Process targetVertex )
585    {
586    return graph.removeEdge( sourceVertex, targetVertex );
587    }
588
589  public boolean removeEdge( ProcessEdge processEdge )
590    {
591    return graph.removeEdge( processEdge );
592    }
593
594  public boolean removeVertex( Process process )
595    {
596    return graph.removeVertex( process );
597    }
598
599  public Set<Process> vertexSet()
600    {
601    return graph.vertexSet();
602    }
603
604  public double getEdgeWeight( ProcessEdge processEdge )
605    {
606    return graph.getEdgeWeight( processEdge );
607    }
608  }