001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    import java.util.Collection;
026    import java.util.Collections;
027    import java.util.HashSet;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import cascading.flow.AssemblyPlanner;
033    import cascading.flow.Flow;
034    import cascading.flow.FlowConnector;
035    import cascading.flow.FlowConnectorProps;
036    import cascading.flow.FlowDef;
037    import cascading.flow.FlowElement;
038    import cascading.operation.AssertionLevel;
039    import cascading.operation.DebugLevel;
040    import cascading.pipe.Checkpoint;
041    import cascading.pipe.CoGroup;
042    import cascading.pipe.Each;
043    import cascading.pipe.Every;
044    import cascading.pipe.Group;
045    import cascading.pipe.GroupBy;
046    import cascading.pipe.HashJoin;
047    import cascading.pipe.Merge;
048    import cascading.pipe.OperatorException;
049    import cascading.pipe.Pipe;
050    import cascading.pipe.Splice;
051    import cascading.pipe.SubAssembly;
052    import cascading.property.ConfigDef;
053    import cascading.property.PropertyUtil;
054    import cascading.scheme.Scheme;
055    import cascading.tap.DecoratorTap;
056    import cascading.tap.Tap;
057    import cascading.tap.TapException;
058    import cascading.tuple.Fields;
059    import cascading.util.Update;
060    import cascading.util.Util;
061    import org.jgrapht.GraphPath;
062    import org.jgrapht.Graphs;
063    import org.slf4j.Logger;
064    import org.slf4j.LoggerFactory;
065    
066    import static cascading.flow.planner.ElementGraphs.*;
067    import static java.util.Arrays.asList;
068    
069    /** Class FlowPlanner is the base class for all planner implementations. */
070    public abstract class FlowPlanner<F extends Flow, Config>
071      {
072      /** Field LOG */
073      private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class );
074    
075      /** Field properties */
076      protected Map<Object, Object> properties;
077    
078      protected String checkpointRootPath = null;
079    
080      /** Field assertionLevel */
081      protected AssertionLevel assertionLevel;
082      /** Field debugLevel */
083      protected DebugLevel debugLevel;
084    
085      /**
086       * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}.
087       *
088       * @param properties of type Map<Object, Object>
089       * @return AssertionLevel the configured AssertionLevel
090       */
091      static AssertionLevel getAssertionLevel( Map<Object, Object> properties )
092        {
093        String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() );
094    
095        return AssertionLevel.valueOf( assertionLevel );
096        }
097    
098      /**
099       * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}.
100       *
101       * @param properties of type Map<Object, Object>
102       * @return DebugLevel the configured DebugLevel
103       */
104      static DebugLevel getDebugLevel( Map<Object, Object> properties )
105        {
106        String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() );
107    
108        return DebugLevel.valueOf( debugLevel );
109        }
110    
111      {
112      Update.registerPlanner( getClass() );
113      }
114    
115      public Map<Object, Object> getProperties()
116        {
117        return properties;
118        }
119    
120      public abstract Config getConfig();
121    
122      public abstract PlatformInfo getPlatformInfo();
123    
124      public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
125        {
126        this.properties = properties;
127        this.assertionLevel = getAssertionLevel( properties );
128        this.debugLevel = getDebugLevel( properties );
129        }
130    
131      protected abstract Flow createFlow( FlowDef flowDef );
132    
133      /**
134       * Method buildFlow renders the actual Flow instance.
135       *
136       * @param flowDef
137       * @return Flow
138       */
139      public abstract F buildFlow( FlowDef flowDef );
140    
141      protected Pipe[] resolveTails( FlowDef flowDef, Flow<Config> flow )
142        {
143        Pipe[] tails = flowDef.getTailsArray();
144    
145        tails = resolveAssemblyPlanners( flowDef, flow, tails );
146    
147        return tails;
148        }
149    
150      protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes )
151        {
152        List<Pipe> tails = Arrays.asList( pipes );
153    
154        List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners();
155    
156        for( AssemblyPlanner assemblyPlanner : assemblyPlanners )
157          {
158          tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) );
159    
160          if( tails.isEmpty() )
161            throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" );
162    
163          tails = Collections.unmodifiableList( tails );
164          }
165    
166        return tails.toArray( new Pipe[ tails.size() ] );
167        }
168    
169      protected void verifyAssembly( FlowDef flowDef, Pipe[] tails )
170        {
171        verifyPipeAssemblyEndPoints( flowDef, tails );
172        verifyTraps( flowDef, tails );
173        verifyCheckpoints( flowDef, tails );
174        }
175    
176      protected void verifyAllTaps( FlowDef flowDef )
177        {
178        verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() );
179    
180        verifyTaps( flowDef.getSources(), true, true );
181        verifyTaps( flowDef.getSinks(), false, true );
182        verifyTaps( flowDef.getTraps(), false, false );
183    
184        // are both sources and sinks
185        verifyTaps( flowDef.getCheckpoints(), true, false );
186        verifyTaps( flowDef.getCheckpoints(), false, false );
187        }
188    
189      protected ElementGraph createElementGraph( FlowDef flowDef, Pipe[] flowTails )
190        {
191        Map<String, Tap> sources = flowDef.getSourcesCopy();
192        Map<String, Tap> sinks = flowDef.getSinksCopy();
193        Map<String, Tap> traps = flowDef.getTrapsCopy();
194        Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
195    
196        AssertionLevel assertionLevel = flowDef.getAssertionLevel() == null ? this.assertionLevel : flowDef.getAssertionLevel();
197        DebugLevel debugLevel = flowDef.getDebugLevel() == null ? this.debugLevel : flowDef.getDebugLevel();
198    
199        checkpointRootPath = makeCheckpointRootPath( flowDef );
200    
201        return new ElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointRootPath != null, assertionLevel, debugLevel );
202        }
203    
204      private String makeCheckpointRootPath( FlowDef flowDef )
205        {
206        String flowName = flowDef.getName();
207        String runID = flowDef.getRunID();
208    
209        if( runID == null )
210          return null;
211    
212        if( flowName == null )
213          throw new PlannerException( "flow name is required when providing a run id" );
214    
215        return flowName + "/" + runID;
216        }
217    
218      protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks )
219        {
220        Collection<Tap> sourcesSet = sources.values();
221    
222        for( Tap tap : sinks.values() )
223          {
224          if( sourcesSet.contains( tap ) )
225            throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap );
226          }
227        }
228    
229      /**
230       * Method verifyTaps ...
231       *
232       * @param taps          of type Map<String, Tap>
233       * @param areSources    of type boolean
234       * @param mayNotBeEmpty of type boolean
235       */
236      protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty )
237        {
238        if( mayNotBeEmpty && taps.isEmpty() )
239          throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" );
240    
241        for( String tapName : taps.keySet() )
242          {
243          if( areSources && !taps.get( tapName ).isSource() )
244            throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) );
245          else if( !areSources && !taps.get( tapName ).isSink() )
246            throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) );
247          }
248        }
249    
250      /**
251       * Method verifyEndPoints verifies
252       * <p/>
253       * there aren't dupe names in heads or tails.
254       * all the sink and source tap names match up with tail and head pipes
255       */
256      // todo: force dupe names to throw exceptions
257      protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails )
258        {
259        Set<String> tapNames = new HashSet<String>();
260    
261        tapNames.addAll( flowDef.getSources().keySet() );
262        tapNames.addAll( flowDef.getSinks().keySet() );
263    
264        // handle tails
265        Set<Pipe> tails = new HashSet<Pipe>();
266        Set<String> tailNames = new HashSet<String>();
267    
268        for( Pipe pipe : flowTails )
269          {
270          if( pipe instanceof SubAssembly )
271            {
272            for( Pipe tail : ( (SubAssembly) pipe ).getTails() )
273              {
274              String tailName = tail.getName();
275    
276              if( !tapNames.contains( tailName ) )
277                throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" );
278    
279              if( tailNames.contains( tailName ) && !tails.contains( tail ) )
280                LOG.warn( "duplicate tail name found: '{}'", tailName );
281    //            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
282    
283              tailNames.add( tailName );
284              tails.add( tail );
285              }
286            }
287          else
288            {
289            String tailName = pipe.getName();
290    
291            if( !tapNames.contains( tailName ) )
292              throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" );
293    
294            if( tailNames.contains( tailName ) && !tails.contains( pipe ) )
295              LOG.warn( "duplicate tail name found: '{}'", tailName );
296    //            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
297    
298            tailNames.add( tailName );
299            tails.add( pipe );
300            }
301          }
302    
303    //    Set<String> allTailNames = new HashSet<String>( tailNames );
304        tailNames.removeAll( flowDef.getSinks().keySet() );
305        Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
306        remainingSinks.removeAll( tailNames );
307    
308        if( tailNames.size() != 0 )
309          throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + Util.join( Util.quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" );
310    
311        // unlike heads, pipes can input to another pipe and simultaneously be a sink
312        // so there is no way to know all the intentional tails, so they aren't listed below in the exception
313        remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
314        remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) );
315    
316        if( remainingSinks.size() != 0 )
317          throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + Util.join( Util.quote( remainingSinks, "'" ), ", " ) + "]" );
318    
319        // handle heads
320        Set<Pipe> heads = new HashSet<Pipe>();
321        Set<String> headNames = new HashSet<String>();
322    
323        for( Pipe pipe : flowTails )
324          {
325          for( Pipe head : pipe.getHeads() )
326            {
327            String headName = head.getName();
328    
329            if( !tapNames.contains( headName ) )
330              throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" );
331    
332            if( headNames.contains( headName ) && !heads.contains( head ) )
333              LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName );
334    //          throw new PlannerException( pipe, "duplicate head name found: " + headName );
335    
336            headNames.add( headName );
337            heads.add( head );
338            }
339          }
340    
341        Set<String> allHeadNames = new HashSet<String>( headNames );
342        headNames.removeAll( flowDef.getSources().keySet() );
343        Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
344        remainingSources.removeAll( headNames );
345    
346        if( headNames.size() != 0 )
347          throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "]" );
348    
349        remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
350        remainingSources.removeAll( allHeadNames );
351    
352        if( remainingSources.size() != 0 )
353          throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + Util.join( Util.quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + Util.join( Util.quote( headNames, "'" ), ", " ) + "]" );
354    
355        }
356    
357      protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails )
358        {
359        verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" );
360    
361        Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
362    
363        for( String name : flowDef.getTraps().keySet() )
364          {
365          if( !names.contains( name ) )
366            throw new PlannerException( "trap name not found in assembly: '" + name + "'" );
367          }
368        }
369    
370      protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails )
371        {
372        verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" );
373    
374        for( Tap checkpointTap : flowDef.getCheckpoints().values() )
375          {
376          Scheme scheme = checkpointTap.getScheme();
377    
378          if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) )
379            continue;
380    
381          throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() );
382          }
383    
384        Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
385    
386        for( String name : flowDef.getCheckpoints().keySet() )
387          {
388          if( !names.contains( name ) )
389            throw new PlannerException( "named checkpoint declared in FlowDef, but no named branch found in pipe assembly: '" + name + "'" );
390    
391          Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) );
392    
393          int count = 0;
394    
395          for( Pipe pipe : pipes )
396            {
397            if( pipe instanceof Checkpoint )
398              count++;
399            }
400    
401          if( count == 0 )
402            throw new PlannerException( "no checkpoint pipe with branch name found in pipe assembly: '" + name + "'" );
403    
404          if( count > 1 )
405            throw new PlannerException( "more than one checkpoint pipe with branch name found in pipe assembly: '" + name + "'" );
406          }
407        }
408    
409      private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role )
410        {
411        Collection<Tap> sourceTaps = sources.values();
412        Collection<Tap> sinkTaps = sinks.values();
413    
414        for( Tap tap : taps.values() )
415          {
416          if( sourceTaps.contains( tap ) )
417            throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap );
418    
419          if( sinkTaps.contains( tap ) )
420            throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap );
421          }
422        }
423    
424      /**
425       * Verifies that there are not only GroupAssertions following any given Group instance. This will adversely
426       * affect the stream entering any subsequent Tap of Each instances.
427       */
428      protected void failOnLoneGroupAssertion( ElementGraph elementGraph )
429        {
430        List<Group> groups = elementGraph.findAllGroups();
431    
432        // walk Every instances after Group
433        for( Group group : groups )
434          {
435          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsFrom( group ) )
436            {
437            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is tail
438    
439            int everies = 0;
440            int assertions = 0;
441    
442            for( FlowElement flowElement : flowElements )
443              {
444              if( flowElement instanceof Group )
445                continue;
446    
447              if( !( flowElement instanceof Every ) )
448                break;
449    
450              everies++;
451    
452              Every every = (Every) flowElement;
453    
454              if( every.getPlannerLevel() != null )
455                assertions++;
456              }
457    
458            if( everies != 0 && everies == assertions )
459              throw new PlannerException( "group assertions must be accompanied by aggregator operations" );
460            }
461          }
462        }
463    
464      protected void failOnMissingGroup( ElementGraph elementGraph )
465        {
466        List<Every> everies = elementGraph.findAllEveries();
467    
468        // walk Every instances after Group
469        for( Every every : everies )
470          {
471          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) )
472            {
473            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every
474            Collections.reverse( flowElements ); // first element is every
475    
476            for( FlowElement flowElement : flowElements )
477              {
478              if( flowElement instanceof Every || flowElement.getClass() == Pipe.class )
479                continue;
480    
481              if( flowElement instanceof GroupBy || flowElement instanceof CoGroup )
482                break;
483    
484              throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a Group pipe, found: " + flowElement );
485              }
486            }
487          }
488        }
489    
490      protected void failOnMisusedBuffer( ElementGraph elementGraph )
491        {
492        List<Every> everies = elementGraph.findAllEveries();
493    
494        // walk Every instances after Group
495        for( Every every : everies )
496          {
497          for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( every ) )
498            {
499            List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is every
500            Collections.reverse( flowElements ); // first element is every
501    
502            Every last = null;
503            boolean foundBuffer = false;
504            int foundEveries = -1;
505    
506            for( FlowElement flowElement : flowElements )
507              {
508              if( flowElement instanceof Each )
509                throw new PlannerException( (Pipe) flowElement, "Every may only be preceded by another Every or a GroupBy or CoGroup pipe, found: " + flowElement );
510    
511              if( flowElement instanceof Every )
512                {
513                foundEveries++;
514    
515                boolean isBuffer = ( (Every) flowElement ).isBuffer();
516    
517                if( foundEveries != 0 && ( isBuffer || foundBuffer ) )
518                  throw new PlannerException( (Pipe) flowElement, "Only one Every with a Buffer may follow a GroupBy or CoGroup pipe, no other Every instances are allowed immediately before or after, found: " + flowElement + " before: " + last );
519    
520                if( !foundBuffer )
521                  foundBuffer = isBuffer;
522    
523                last = (Every) flowElement;
524                }
525    
526              if( flowElement instanceof Group )
527                break;
528              }
529            }
530          }
531        }
532    
533      protected void failOnGroupEverySplit( ElementGraph elementGraph )
534        {
535        List<Group> groups = new ArrayList<Group>();
536    
537        elementGraph.findAllOfType( 1, 2, Group.class, groups );
538    
539        for( Group group : groups )
540          {
541          Set<FlowElement> children = elementGraph.getAllChildrenNotExactlyType( group, Pipe.class );
542    
543          for( FlowElement flowElement : children )
544            {
545            if( flowElement instanceof Every )
546              throw new PlannerException( (Every) flowElement, "Every instances may not split after a GroupBy or CoGroup pipe, found: " + flowElement + " after: " + group );
547            }
548          }
549        }
550    
551      protected PlannerException handleExceptionDuringPlanning( Exception exception, ElementGraph elementGraph )
552        {
553        if( exception instanceof PlannerException )
554          {
555          ( (PlannerException) exception ).elementGraph = elementGraph;
556    
557          return (PlannerException) exception;
558          }
559        else if( exception instanceof ElementGraphException )
560          {
561          Throwable cause = exception.getCause();
562    
563          if( cause == null )
564            cause = exception;
565    
566          // captures pipegraph for debugging
567          // forward message in case cause or trace is lost
568          String message = "could not build flow from assembly";
569    
570          if( cause.getMessage() != null )
571            message = String.format( "%s: [%s]", message, cause.getMessage() );
572    
573          if( cause instanceof OperatorException )
574            return new PlannerException( message, cause, elementGraph );
575    
576          if( cause instanceof TapException )
577            return new PlannerException( message, cause, elementGraph );
578    
579          return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, elementGraph );
580          }
581        else
582          {
583          // captures pipegraph for debugging
584          // forward message in case cause or trace is lost
585          String message = "could not build flow from assembly";
586    
587          if( exception.getMessage() != null )
588            message = String.format( "%s: [%s]", message, exception.getMessage() );
589    
590          return new PlannerException( message, exception, elementGraph );
591          }
592        }
593    
594      protected void handleNonSafeOperations( ElementGraph elementGraph )
595        {
596        // if there was a graph change, iterate paths again.
597        while( !internalNonSafeOperations( elementGraph ) )
598          ;
599        }
600    
601      private boolean internalNonSafeOperations( ElementGraph elementGraph )
602        {
603        Set<Pipe> tapInsertions = new HashSet<Pipe>();
604    
605        List<Pipe> splits = elementGraph.findAllPipeSplits();
606    
607        // if any predecessor is unsafe, insert temp
608        for( Pipe split : splits )
609          {
610          List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsTo( split );
611    
612          for( GraphPath<FlowElement, Scope> path : paths )
613            {
614            List<FlowElement> elements = Graphs.getPathVertexList( path );
615            Collections.reverse( elements );
616    
617            for( FlowElement element : elements )
618              {
619              if( !( element instanceof Each ) && element.getClass() != Pipe.class )
620                break;
621    
622              if( element.getClass() == Pipe.class )
623                continue;
624    
625              if( !( (Each) element ).getOperation().isSafe() )
626                {
627                tapInsertions.add( split );
628                break;
629                }
630              }
631            }
632          }
633    
634        for( Pipe pipe : tapInsertions )
635          insertTempTapAfter( elementGraph, pipe );
636    
637        return tapInsertions.isEmpty();
638        }
639    
640      /**
641       * Method insertTapAfter ...
642       *
643       * @param graph of type PipeGraph
644       * @param pipe  of type Pipe
645       */
646      protected void insertTempTapAfter( ElementGraph graph, Pipe pipe )
647        {
648        LOG.debug( "inserting tap after: {}", pipe );
649    
650        Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() );
651    
652        if( checkpointTap != null )
653          {
654          LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap );
655          checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS );
656          }
657    
658        if( checkpointTap == null )
659          {
660          // only restart from a checkpoint pipe or checkpoint tap below
661          if( pipe instanceof Checkpoint )
662            {
663            checkpointTap = makeTempTap( checkpointRootPath, pipe.getName() );
664            checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS );
665            // mark as an anonymous checkpoint
666            checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
667            }
668          else
669            {
670            checkpointTap = makeTempTap( pipe.getName() );
671            }
672          }
673    
674        checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS );
675    
676        graph.insertFlowElementAfter( pipe, checkpointTap );
677        }
678    
679      private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClass )
680        {
681        String decoratorClassName = PropertyUtil.getProperty( properties, pipe, decoratorClass );
682    
683        if( Util.isEmpty( decoratorClassName ) )
684          return tempTap;
685    
686        LOG.info( "found decorator: {}, wrapping tap: {}", decoratorClass, tempTap );
687    
688        tempTap = Util.newInstance( decoratorClassName, tempTap );
689    
690        return tempTap;
691        }
692    
693      protected Tap makeTempTap( String name )
694        {
695        return makeTempTap( null, name );
696        }
697    
698      protected abstract Tap makeTempTap( String prefix, String name );
699    
700      /**
701       * Inserts a temporary Tap between logical MR jobs.
702       * <p/>
703       * Since all joins are at groups or splices, depth first search is safe
704       * <p/>
705       * todo: refactor so that rules are applied to path segments bounded by taps
706       * todo: this would allow balancing of operations within paths instead of pushing
707       * todo: all operations up. may allow for consolidation of rules
708       *
709       * @param elementGraph of type PipeGraph
710       */
711      protected void handleJobPartitioning( ElementGraph elementGraph )
712        {
713        // if there was a graph change, iterate paths again. prevents many temp taps from being inserted in front of a group
714        while( !internalJobPartitioning( elementGraph ) )
715          ;
716        }
717    
718      private boolean internalJobPartitioning( ElementGraph elementGraph )
719        {
720        for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsBetweenExtents() )
721          {
722          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
723          List<Pipe> tapInsertions = new ArrayList<Pipe>();
724    
725          boolean foundGroup = false;
726    
727          for( int i = 0; i < flowElements.size(); i++ )
728            {
729            FlowElement flowElement = flowElements.get( i );
730    
731            if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail
732              continue;
733            else if( flowElement instanceof Tap && flowElements.get( i - 1 ) instanceof ElementGraph.Extent )  // is a source tap
734              continue;
735    
736            if( flowElement instanceof Group && !foundGroup )
737              {
738              foundGroup = true;
739              }
740            else if( flowElement instanceof Splice && foundGroup ) // add tap between groups, push joins/merge map side
741              {
742              tapInsertions.add( (Pipe) flowElements.get( i - 1 ) );
743    
744              if( !( flowElement instanceof Group ) )
745                foundGroup = false;
746              }
747            else if( flowElement instanceof Checkpoint ) // add tap after checkpoint
748              {
749              if( flowElements.get( i + 1 ) instanceof Tap ) // don't keep inserting
750                continue;
751    
752              tapInsertions.add( (Pipe) flowElement );
753              foundGroup = false;
754              }
755            else if( flowElement instanceof Tap )
756              {
757              foundGroup = false;
758              }
759            }
760    
761          for( Pipe pipe : tapInsertions )
762            insertTempTapAfter( elementGraph, pipe );
763    
764          if( !tapInsertions.isEmpty() )
765            return false;
766          }
767    
768        return true;
769        }
770    
771      /**
772       * Prevent leftmost sources from sourcing a downstream join on the rightmost side intra-task by inserting a
773       * temp tap between the left-sourced join and right-sourced join.
774       *
775       * @param elementGraph
776       */
777      protected void handleJoins( ElementGraph elementGraph )
778        {
779        while( !internalJoins( elementGraph ) )
780          ;
781        }
782    
783      private boolean internalJoins( ElementGraph elementGraph )
784        {
785        List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents();
786    
787        // large to small
788        Collections.reverse( paths );
789    
790        for( GraphPath<FlowElement, Scope> path : paths )
791          {
792          List<FlowElement> flowElements = Graphs.getPathVertexList( path );
793          List<Pipe> tapInsertions = new ArrayList<Pipe>();
794          List<HashJoin> joins = new ArrayList<HashJoin>();
795          List<Merge> merges = new ArrayList<Merge>();
796    
797          FlowElement lastSourceElement = null;
798    
799          for( int i = 0; i < flowElements.size(); i++ )
800            {
801            FlowElement flowElement = flowElements.get( i );
802    
803            if( flowElement instanceof Merge )
804              {
805              merges.add( (Merge) flowElement );
806              }
807            else if( flowElement instanceof HashJoin )
808              {
809              HashJoin join = (HashJoin) flowElement;
810    
811              Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true );
812    
813              // is this path streamed
814              int pathPosition = pathPositionInto( path, join );
815              boolean thisPathIsStreamed = pathPosition == 0;
816    
817              boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths
818              int pathCount = countPaths( pathCounts );
819    
820              int priorJoins = countTypesBetween( elementGraph, lastSourceElement, join, HashJoin.class );
821    
822              if( priorJoins == 0 )
823                {
824                // if same source is leading into the hashjoin, insert tap on the accumulated side
825                if( pathCount == 2 && isAccumulatedAndStreamed && !thisPathIsStreamed )
826                  {
827                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
828                  break;
829                  }
830    
831                // if more than one path into streamed and accumulated branches, insert tap on streamed side
832                if( pathCount > 2 && isAccumulatedAndStreamed && thisPathIsStreamed )
833                  {
834                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
835                  break;
836                  }
837                }
838    
839              if( !merges.isEmpty() )
840                {
841                // if a Merge is prior to a HashJoin, and its an accumulated path, force Merge results to disk
842                int joinPos = flowElements.indexOf( join );
843                int mergePos = nearest( flowElements, joinPos, merges );
844    
845                if( mergePos != -1 && joinPos > mergePos )
846                  {
847                  // if all paths are accumulated and streamed, insert
848                  // else if just if this path is accumulated
849                  if( ( isAccumulatedAndStreamed && thisPathIsStreamed ) || !thisPathIsStreamed )
850                    {
851                    tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
852                    break;
853                    }
854                  }
855                }
856    
857              joins.add( (HashJoin) flowElement );
858              }
859            else if( flowElement instanceof Tap || flowElement instanceof Group )
860              {
861              // added for JoinFieldedPipesPlatformTest.testJoinMergeGroupBy where Merge hides streamed nature of path
862              if( flowElement instanceof Group && !joins.isEmpty() )
863                {
864                List<Splice> splices = new ArrayList<Splice>();
865    
866                splices.addAll( merges );
867                splices.add( (Splice) flowElement );
868    
869                Collections.reverse( splices );
870    
871                for( Splice splice : splices )
872                  {
873                  Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, splice, true );
874    
875                  if( isBothAccumulatedAndStreamedPath( pathCounts ) )
876                    {
877                    tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( splice ) - 1 ) );
878                    break;
879                    }
880                  }
881    
882                if( !tapInsertions.isEmpty() )
883                  break;
884                }
885    
886              for( int j = 0; j < joins.size(); j++ )
887                {
888                HashJoin join = joins.get( j );
889    
890                int pathPosition = pathPositionInto( path, join );
891                boolean thisPathIsStreamed = pathPosition == 0;
892    
893                Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true );
894    
895                boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths
896                int pathCount = countPaths( pathCounts );
897    
898                if( pathCount >= 2 && isAccumulatedAndStreamed && thisPathIsStreamed )
899                  {
900                  tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
901                  break;
902                  }
903    
904                if( thisPathIsStreamed )
905                  continue;
906    
907                if( j == 0 ) // is accumulated on first join
908                  break;
909    
910                // prevent a streamed path from being accumulated by injecting a tap before the
911                // current HashJoin
912                tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
913                break;
914                }
915    
916              if( !tapInsertions.isEmpty() )
917                break;
918    
919              lastSourceElement = flowElement;
920              merges.clear();
921              joins.clear();
922              }
923            }
924    
925          for( Pipe pipe : tapInsertions )
926            insertTempTapAfter( elementGraph, pipe );
927    
928          if( !tapInsertions.isEmpty() )
929            return false;
930          }
931    
932        return true;
933        }
934    
935      private int nearest( List<FlowElement> flowElements, int index, List<Merge> merges )
936        {
937        List<Merge> reversed = new ArrayList<Merge>( merges );
938        Collections.reverse( reversed );
939    
940        for( Merge merge : reversed )
941          {
942          int pos = flowElements.indexOf( merge );
943          if( pos < index )
944            return pos;
945          }
946    
947        return -1;
948        }
949      }