001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.TreeSet;
031
032import cascading.flow.AssemblyPlanner;
033import cascading.flow.BaseFlow;
034import cascading.flow.Flow;
035import cascading.flow.FlowConnector;
036import cascading.flow.FlowConnectorProps;
037import cascading.flow.FlowDef;
038import cascading.flow.FlowElement;
039import cascading.flow.FlowStep;
040import cascading.flow.Flows;
041import cascading.flow.planner.graph.ElementGraph;
042import cascading.flow.planner.graph.FlowElementGraph;
043import cascading.flow.planner.process.FlowNodeFactory;
044import cascading.flow.planner.process.FlowNodeGraph;
045import cascading.flow.planner.process.FlowStepFactory;
046import cascading.flow.planner.process.FlowStepGraph;
047import cascading.flow.planner.rule.ProcessLevel;
048import cascading.flow.planner.rule.RuleRegistry;
049import cascading.flow.planner.rule.RuleRegistrySet;
050import cascading.flow.planner.rule.RuleResult;
051import cascading.flow.planner.rule.RuleSetExec;
052import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory;
053import cascading.flow.planner.rule.util.TraceWriter;
054import cascading.operation.AssertionLevel;
055import cascading.operation.DebugLevel;
056import cascading.pipe.Checkpoint;
057import cascading.pipe.OperatorException;
058import cascading.pipe.Pipe;
059import cascading.pipe.SubAssembly;
060import cascading.property.ConfigDef;
061import cascading.property.PropertyUtil;
062import cascading.scheme.Scheme;
063import cascading.tap.Tap;
064import cascading.tap.TapException;
065import cascading.tuple.Fields;
066import cascading.util.Update;
067import cascading.util.Util;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import static cascading.util.Util.*;
072import static java.util.Arrays.asList;
073
074/**
075 * Class FlowPlanner is the base class for all planner implementations.
076 * <p/>
077 * This planner support tracing execution of each rule. See the appropriate properties on this
078 * class to enable.
079 */
080public abstract class FlowPlanner<F extends BaseFlow, Config>
081  {
082  /**
083   * Enables the planner to write out basic planner information including the initial element-graph,
084   * completed element-graph, and the completed step-graph dot files.
085   */
086  public static final String TRACE_PLAN_PATH = "cascading.planner.plan.path";
087
088  /**
089   * Enables the planner to write out detail level planner information for each rule, including recursive
090   * transforms.
091   * <p/>
092   * Use this to debug rules. This does increase overhead during planning.
093   */
094  public static final String TRACE_PLAN_TRANSFORM_PATH = "cascading.planner.plan.transforms.path";
095
096  /**
097   * Enables the planner to write out planner statistics for each planner phase and rule.
098   */
099  public static final String TRACE_STATS_PATH = "cascading.planner.stats.path";
100
101  /** Field LOG */
102  private static final Logger LOG = LoggerFactory.getLogger( FlowPlanner.class );
103
104  /** Field properties */
105  protected Map<Object, Object> defaultProperties;
106
107  protected String checkpointTapRootPath = null;
108
109  /** Field assertionLevel */
110  protected AssertionLevel defaultAssertionLevel;
111  /** Field debugLevel */
112  protected DebugLevel defaultDebugLevel;
113
114  /**
115   * Method getAssertionLevel returns the configured target planner {@link cascading.operation.AssertionLevel}.
116   *
117   * @param properties of type Map<Object, Object>
118   * @return AssertionLevel the configured AssertionLevel
119   */
120  static AssertionLevel getAssertionLevel( Map<Object, Object> properties )
121    {
122    String assertionLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.assertionlevel", AssertionLevel.STRICT.name() );
123
124    return AssertionLevel.valueOf( assertionLevel );
125    }
126
127  /**
128   * Method getDebugLevel returns the configured target planner {@link cascading.operation.DebugLevel}.
129   *
130   * @param properties of type Map<Object, Object>
131   * @return DebugLevel the configured DebugLevel
132   */
133  static DebugLevel getDebugLevel( Map<Object, Object> properties )
134    {
135    String debugLevel = PropertyUtil.getProperty( properties, "cascading.flowconnector.debuglevel", DebugLevel.DEFAULT.name() );
136
137    return DebugLevel.valueOf( debugLevel );
138    }
139
140  {
141  Update.registerPlanner( getClass() );
142  }
143
144  public Map<Object, Object> getDefaultProperties()
145    {
146    return defaultProperties;
147    }
148
149  public abstract Config getDefaultConfig();
150
151  public abstract PlannerInfo getPlannerInfo( String name );
152
153  public abstract PlatformInfo getPlatformInfo();
154
155  public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
156    {
157    this.defaultProperties = properties;
158    this.defaultAssertionLevel = getAssertionLevel( properties );
159    this.defaultDebugLevel = getDebugLevel( properties );
160    }
161
162  public F buildFlow( FlowDef flowDef, RuleRegistrySet ruleRegistrySet )
163    {
164    FlowElementGraph flowElementGraph = null;
165
166    try
167      {
168      flowDef = normalizeTaps( flowDef );
169
170      verifyAllTaps( flowDef );
171
172      F flow = createFlow( flowDef );
173
174      Pipe[] tails = resolveTails( flowDef, flow );
175
176      verifyAssembly( flowDef, tails );
177
178      flowElementGraph = createFlowElementGraph( flowDef, tails );
179
180      TraceWriter traceWriter = new TraceWriter( flow );
181      RuleSetExec ruleSetExec = new RuleSetExec( traceWriter, this, flow, ruleRegistrySet, flowDef, flowElementGraph );
182
183      RuleResult ruleResult = ruleSetExec.exec();
184
185      traceWriter.writeTracePlan( null, "0-initial-flow-element-graph", flowElementGraph );
186
187      FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph();
188
189      finalFlowElementGraph = flow.updateSchemes( finalFlowElementGraph );
190
191      Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap();
192      Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap();
193
194      FlowStepGraph flowStepGraph = new FlowStepGraph( getFlowStepFactory(), finalFlowElementGraph, stepToNodes, nodeToPipeline );
195
196      traceWriter.writeFinal( "1-final-flow-registry", ruleResult );
197      traceWriter.writeTracePlan( null, "2-final-flow-element-graph", finalFlowElementGraph );
198      traceWriter.writeTracePlan( null, "3-final-flow-step-graph", flowStepGraph );
199      traceWriter.writeTracePlanSteps( "4-final-flow-steps", flowStepGraph );
200
201      flow.setPlannerInfo( getPlannerInfo( ruleResult.getRegistry().getName() ) );
202
203      flow.initialize( finalFlowElementGraph, flowStepGraph );
204
205      return flow;
206      }
207    catch( Exception exception )
208      {
209      throw handleExceptionDuringPlanning( flowDef, exception, flowElementGraph );
210      }
211    }
212
213  protected abstract F createFlow( FlowDef flowDef );
214
215  public abstract FlowStepFactory<Config> getFlowStepFactory();
216
217  public FlowNodeFactory getFlowNodeFactory()
218    {
219    return new BaseFlowNodeFactory();
220    }
221
222  public void configRuleRegistryDefaults( RuleRegistry ruleRegistry )
223    {
224
225    }
226
227  protected Pipe[] resolveTails( FlowDef flowDef, F flow )
228    {
229    Pipe[] tails = flowDef.getTailsArray();
230
231    tails = resolveAssemblyPlanners( flowDef, flow, tails );
232
233    return tails;
234    }
235
236  protected Pipe[] resolveAssemblyPlanners( FlowDef flowDef, Flow flow, Pipe[] pipes )
237    {
238    List<Pipe> tails = Arrays.asList( pipes );
239
240    List<AssemblyPlanner> assemblyPlanners = flowDef.getAssemblyPlanners();
241
242    for( AssemblyPlanner assemblyPlanner : assemblyPlanners )
243      {
244      tails = assemblyPlanner.resolveTails( new AssemblyPlannerContext( flowDef, flow, tails ) );
245
246      if( tails.isEmpty() )
247        throw new PlannerException( "assembly planner: " + assemblyPlanner + ", returned zero tails" );
248
249      tails = Collections.unmodifiableList( tails );
250      }
251
252    return tails.toArray( new Pipe[ tails.size() ] );
253    }
254
255  protected void verifyAssembly( FlowDef flowDef, Pipe[] tails )
256    {
257    verifyPipeAssemblyEndPoints( flowDef, tails );
258    verifyTraps( flowDef, tails );
259    verifyCheckpoints( flowDef, tails );
260    }
261
262  protected void verifyAllTaps( FlowDef flowDef )
263    {
264    verifySourceNotSinks( flowDef.getSources(), flowDef.getSinks() );
265
266    verifyTaps( flowDef.getSources(), true, true );
267    verifyTaps( flowDef.getSinks(), false, true );
268    verifyTaps( flowDef.getTraps(), false, false );
269
270    // are both sources and sinks
271    verifyTaps( flowDef.getCheckpoints(), true, false );
272    verifyTaps( flowDef.getCheckpoints(), false, false );
273    }
274
275  protected FlowElementGraph createFlowElementGraph( FlowDef flowDef, Pipe[] flowTails )
276    {
277    Map<String, Tap> sources = flowDef.getSourcesCopy();
278    Map<String, Tap> sinks = flowDef.getSinksCopy();
279    Map<String, Tap> traps = flowDef.getTrapsCopy();
280    Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
281
282    checkpointTapRootPath = makeCheckpointRootPath( flowDef );
283
284    return new FlowElementGraph( getPlatformInfo(), flowTails, sources, sinks, traps, checkpoints, checkpointTapRootPath != null );
285    }
286
287  private FlowDef normalizeTaps( FlowDef flowDef )
288    {
289    Set<Tap> taps = new HashSet<>();
290
291    Map<String, Tap> sources = flowDef.getSourcesCopy();
292    Map<String, Tap> sinks = flowDef.getSinksCopy();
293    Map<String, Tap> traps = flowDef.getTrapsCopy();
294    Map<String, Tap> checkpoints = flowDef.getCheckpointsCopy();
295
296    boolean sourcesHasDupes = addTaps( sources, taps );
297    boolean sinksHasDupes = addTaps( sinks, taps );
298    boolean trapsHasDupes = addTaps( traps, taps );
299    boolean checkpointsHasDupes = addTaps( checkpoints, taps );
300
301    if( sourcesHasDupes )
302      normalize( taps, sources );
303
304    if( sinksHasDupes )
305      normalize( taps, sinks );
306
307    if( trapsHasDupes )
308      normalize( taps, traps );
309
310    if( checkpointsHasDupes )
311      normalize( taps, checkpoints );
312
313    return Flows.copy( flowDef, sources, sinks, traps, checkpoints );
314    }
315
316  private boolean addTaps( Map<String, Tap> current, Set<Tap> taps )
317    {
318    int size = taps.size();
319
320    taps.addAll( current.values() );
321
322    // if all the added values are not unique, taps.size will be less than original size + num tap instances
323    return size + current.size() != taps.size();
324    }
325
326  private void normalize( Set<Tap> taps, Map<String, Tap> current )
327    {
328    for( Tap tap : taps )
329      {
330      for( Map.Entry<String, Tap> entry : current.entrySet() )
331        {
332        if( entry.getValue().equals( tap ) ) // force equivalent instance to being the same instance
333          entry.setValue( tap );
334        }
335      }
336    }
337
338  private String makeCheckpointRootPath( FlowDef flowDef )
339    {
340    String flowName = flowDef.getName();
341    String runID = flowDef.getRunID();
342
343    if( runID == null )
344      return null;
345
346    if( flowName == null )
347      throw new PlannerException( "flow name is required when providing a run id" );
348
349    return flowName + "/" + runID;
350    }
351
352  protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks )
353    {
354    Collection<Tap> sourcesSet = sources.values();
355
356    for( Tap tap : sinks.values() )
357      {
358      if( sourcesSet.contains( tap ) )
359        throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap );
360      }
361    }
362
363  /**
364   * Method verifyTaps ...
365   *
366   * @param taps          of type Map<String, Tap>
367   * @param areSources    of type boolean
368   * @param mayNotBeEmpty of type boolean
369   */
370  protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean mayNotBeEmpty )
371    {
372    if( mayNotBeEmpty && taps.isEmpty() )
373      throw new PlannerException( ( areSources ? "source" : "sink" ) + " taps are required" );
374
375    for( String tapName : taps.keySet() )
376      {
377      if( areSources && !taps.get( tapName ).isSource() )
378        throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a source: " + taps.get( tapName ) );
379      else if( !areSources && !taps.get( tapName ).isSink() )
380        throw new PlannerException( "tap named: '" + tapName + "', cannot be used as a sink: " + taps.get( tapName ) );
381      }
382    }
383
384  /**
385   * Method verifyEndPoints verifies
386   * <p/>
387   * there aren't dupe names in heads or tails.
388   * all the sink and source tap names match up with tail and head pipes
389   */
390  // todo: force dupe names to throw exceptions
391  protected void verifyPipeAssemblyEndPoints( FlowDef flowDef, Pipe[] flowTails )
392    {
393    Set<String> tapNames = new HashSet<String>();
394
395    tapNames.addAll( flowDef.getSources().keySet() );
396    tapNames.addAll( flowDef.getSinks().keySet() );
397
398    // handle tails
399    Set<Pipe> tails = new HashSet<Pipe>();
400    Set<String> tailNames = new HashSet<String>();
401
402    for( Pipe pipe : flowTails )
403      {
404      if( pipe instanceof SubAssembly )
405        {
406        for( Pipe tail : ( (SubAssembly) pipe ).getTails() )
407          {
408          String tailName = tail.getName();
409
410          if( !tapNames.contains( tailName ) )
411            throw new PlannerException( tail, "pipe name not found in either sink or source map: '" + tailName + "'" );
412
413          if( tailNames.contains( tailName ) && !tails.contains( tail ) )
414            throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
415
416          tailNames.add( tailName );
417          tails.add( tail );
418          }
419        }
420      else
421        {
422        String tailName = pipe.getName();
423
424        if( !tapNames.contains( tailName ) )
425          throw new PlannerException( pipe, "pipe name not found in either sink or source map: '" + tailName + "'" );
426
427        if( tailNames.contains( tailName ) && !tails.contains( pipe ) )
428          throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
429
430        tailNames.add( tailName );
431        tails.add( pipe );
432        }
433      }
434
435    tailNames.removeAll( flowDef.getSinks().keySet() );
436    Set<String> remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
437    remainingSinks.removeAll( tailNames );
438
439    if( tailNames.size() != 0 )
440      throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + join( quote( tailNames, "'" ), ", " ) + "], remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" );
441
442    // unlike heads, pipes can input to another pipe and simultaneously be a sink
443    // so there is no way to know all the intentional tails, so they aren't listed below in the exception
444    remainingSinks = new HashSet<String>( flowDef.getSinks().keySet() );
445    remainingSinks.removeAll( asList( Pipe.names( flowTails ) ) );
446
447    if( remainingSinks.size() != 0 )
448      throw new PlannerException( "not all sink taps bound to tail pipes, remaining sink tap names: [" + join( quote( remainingSinks, "'" ), ", " ) + "]" );
449
450    // handle heads
451    Set<Pipe> heads = new HashSet<Pipe>();
452    Set<String> headNames = new HashSet<String>();
453
454    for( Pipe pipe : flowTails )
455      {
456      for( Pipe head : pipe.getHeads() )
457        {
458        String headName = head.getName();
459
460        if( !tapNames.contains( headName ) )
461          throw new PlannerException( head, "pipe name not found in either sink or source map: '" + headName + "'" );
462
463        if( headNames.contains( headName ) && !heads.contains( head ) )
464          LOG.warn( "duplicate head name found, not an error but heads should have unique names: '{}'", headName );
465
466        headNames.add( headName );
467        heads.add( head );
468        }
469      }
470
471    Set<String> allHeadNames = new HashSet<String>( headNames );
472    headNames.removeAll( flowDef.getSources().keySet() );
473    Set<String> remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
474    remainingSources.removeAll( headNames );
475
476    if( headNames.size() != 0 )
477      throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "], remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "]" );
478
479    remainingSources = new HashSet<String>( flowDef.getSources().keySet() );
480    remainingSources.removeAll( allHeadNames );
481
482    if( remainingSources.size() != 0 )
483      throw new PlannerException( "not all source taps bound to head pipes, remaining source tap names: [" + join( quote( remainingSources, "'" ), ", " ) + "], remaining head pipe names: [" + join( quote( headNames, "'" ), ", " ) + "]" );
484
485    }
486
487  protected void verifyTraps( FlowDef flowDef, Pipe[] flowTails )
488    {
489    verifyNotSourcesSinks( flowDef.getTraps(), flowDef.getSources(), flowDef.getSinks(), "trap" );
490
491    Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
492
493    for( String name : flowDef.getTraps().keySet() )
494      {
495      if( !names.contains( name ) )
496        throw new PlannerException( "trap name not found in assembly: '" + name + "'" );
497      }
498    }
499
500  protected void verifyCheckpoints( FlowDef flowDef, Pipe[] flowTails )
501    {
502    verifyNotSourcesSinks( flowDef.getCheckpoints(), flowDef.getSources(), flowDef.getSinks(), "checkpoint" );
503
504    for( Tap checkpointTap : flowDef.getCheckpoints().values() )
505      {
506      Scheme scheme = checkpointTap.getScheme();
507
508      if( scheme.getSourceFields().equals( Fields.UNKNOWN ) && scheme.getSinkFields().equals( Fields.ALL ) )
509        continue;
510
511      throw new PlannerException( "checkpoint tap scheme must be undeclared, source fields must be UNKNOWN, and sink fields ALL, got: " + scheme.toString() );
512      }
513
514    Set<String> names = new HashSet<String>( asList( Pipe.names( flowTails ) ) );
515
516    for( String name : flowDef.getCheckpoints().keySet() )
517      {
518      if( !names.contains( name ) )
519        throw new PlannerException( "named checkpoint declared in FlowDef, but no named branch found in pipe assembly: '" + name + "'" );
520
521      Set<Pipe> pipes = new HashSet<Pipe>( asList( Pipe.named( name, flowTails ) ) );
522
523      int count = 0;
524
525      for( Pipe pipe : pipes )
526        {
527        if( pipe instanceof Checkpoint )
528          count++;
529        }
530
531      if( count == 0 )
532        throw new PlannerException( "no checkpoint pipe with branch name found in pipe assembly: '" + name + "'" );
533
534      if( count > 1 )
535        throw new PlannerException( "more than one checkpoint pipe with branch name found in pipe assembly: '" + name + "'" );
536      }
537    }
538
539  private void verifyNotSourcesSinks( Map<String, Tap> taps, Map<String, Tap> sources, Map<String, Tap> sinks, String role )
540    {
541    Collection<Tap> sourceTaps = sources.values();
542    Collection<Tap> sinkTaps = sinks.values();
543
544    for( Tap tap : taps.values() )
545      {
546      if( sourceTaps.contains( tap ) )
547        throw new PlannerException( "tap may not be used as both a " + role + " and a source in the same Flow: " + tap );
548
549      if( sinkTaps.contains( tap ) )
550        throw new PlannerException( "tap may not be used as both a " + role + " and a sink in the same Flow: " + tap );
551      }
552    }
553
554  /**
555   * If there are rules for a given {@link cascading.flow.planner.rule.ProcessLevel} on the current platform
556   * there must be sub-graphs partitioned at that level.
557   */
558  public Exception verifyResult( RuleResult ruleResult )
559    {
560    try
561      {
562      verifyResultInternal( ruleResult );
563      }
564    catch( Exception exception )
565      {
566      return exception;
567      }
568
569    return null;
570    }
571
572  protected void verifyResultInternal( RuleResult ruleResult )
573    {
574    Set<ProcessLevel> processLevels = getReverseOrderedProcessLevels( ruleResult );
575
576    for( ProcessLevel processLevel : processLevels )
577      {
578      String registryName = ruleResult.getRegistry().getName();
579
580      switch( processLevel )
581        {
582        case Assembly:
583
584          FlowElementGraph finalFlowElementGraph = ruleResult.getAssemblyGraph();
585
586          if( finalFlowElementGraph.vertexSet().isEmpty() )
587            throw new PlannerException( "final assembly graph is empty: " + registryName );
588
589          break;
590
591        case Step:
592
593          Map<ElementGraph, List<? extends ElementGraph>> assemblyToSteps = ruleResult.getAssemblyToStepGraphMap();
594
595          if( assemblyToSteps.isEmpty() )
596            throw new PlannerException( "no steps partitioned: " + registryName );
597
598          for( ElementGraph assembly : assemblyToSteps.keySet() )
599            {
600            List<? extends ElementGraph> steps = assemblyToSteps.get( assembly );
601
602            if( steps.isEmpty() )
603              throw new PlannerException( "no steps partitioned from assembly: " + registryName, assembly );
604
605            Set<ElementGraph> stepSet = new HashSet<>( steps.size() );
606
607            for( ElementGraph step : steps )
608              {
609              if( !stepSet.add( step ) )
610                throw new PlannerException( "found duplicate step in flow: " + registryName, step );
611              }
612
613            Set<FlowElement> elements = createIdentitySet();
614
615            for( ElementGraph step : steps )
616              elements.addAll( step.vertexSet() );
617
618            Set<FlowElement> missing = differenceIdentity( assembly.vertexSet(), elements );
619
620            if( !missing.isEmpty() )
621              {
622              String message = "union of steps have " + missing.size() + " fewer elements than parent assembly: " + registryName + ", missing: [" + join( missing, ", " ) + "]";
623              throw new PlannerException( message, assembly );
624              }
625            }
626
627          break;
628
629        case Node:
630
631          Map<ElementGraph, List<? extends ElementGraph>> stepToNodes = ruleResult.getStepToNodeGraphMap();
632
633          if( stepToNodes.isEmpty() )
634            throw new PlannerException( "no nodes partitioned: " + registryName );
635
636          for( ElementGraph step : stepToNodes.keySet() )
637            {
638            List<? extends ElementGraph> nodes = stepToNodes.get( step );
639
640            if( nodes.isEmpty() )
641              throw new PlannerException( "no nodes partitioned from step: " + registryName, step );
642
643            Set<ElementGraph> nodesSet = new HashSet<>( nodes.size() );
644
645            for( ElementGraph node : nodes )
646              {
647              if( !nodesSet.add( node ) )
648                throw new PlannerException( "found duplicate node in step: " + registryName, node );
649              }
650
651            Set<FlowElement> elements = createIdentitySet();
652
653            for( ElementGraph node : nodes )
654              elements.addAll( node.vertexSet() );
655
656            Set<FlowElement> missing = differenceIdentity( step.vertexSet(), elements );
657
658            if( !missing.isEmpty() )
659              {
660              String message = "union of nodes have " + missing.size() + " fewer elements than parent step: " + registryName + ", missing: [" + join( missing, ", " ) + "]";
661              throw new PlannerException( message, step );
662              }
663            }
664
665          break;
666
667        case Pipeline:
668
669          // all nodes are partitioned into pipelines, but if partitioned all elements should be represented
670          Map<ElementGraph, List<? extends ElementGraph>> nodeToPipeline = ruleResult.getNodeToPipelineGraphMap();
671
672          if( nodeToPipeline.isEmpty() )
673            throw new PlannerException( "no pipelines partitioned: " + registryName );
674
675          for( ElementGraph node : nodeToPipeline.keySet() )
676            {
677            List<? extends ElementGraph> pipelines = nodeToPipeline.get( node );
678
679            if( pipelines.isEmpty() )
680              throw new PlannerException( "no pipelines partitioned from node: " + registryName, node );
681
682            Set<ElementGraph> pipelineSet = new HashSet<>( pipelines.size() );
683
684            for( ElementGraph pipeline : pipelines )
685              {
686              if( !pipelineSet.add( pipeline ) )
687                throw new PlannerException( "found duplicate pipeline in node: " + registryName, pipeline );
688              }
689
690            Set<FlowElement> elements = createIdentitySet();
691
692            for( ElementGraph pipeline : pipelines )
693              elements.addAll( pipeline.vertexSet() );
694
695            Set<FlowElement> missing = differenceIdentity( node.vertexSet(), elements );
696
697            if( !missing.isEmpty() )
698              {
699              String message = "union of pipelines have " + missing.size() + " fewer elements than parent node: " + registryName + ", missing: [" + join( missing, ", " ) + "]";
700              throw new PlannerException( message, node );
701              }
702            }
703
704          break;
705        }
706      }
707    }
708
709  protected PlannerException handleExceptionDuringPlanning( FlowDef flowDef, Exception exception, FlowElementGraph flowElementGraph )
710    {
711    if( exception instanceof PlannerException )
712      {
713      if( ( (PlannerException) exception ).elementGraph == null )
714        ( (PlannerException) exception ).elementGraph = flowElementGraph;
715
716      return (PlannerException) exception;
717      }
718    else if( exception instanceof ElementGraphException )
719      {
720      Throwable cause = exception.getCause();
721
722      if( cause == null )
723        cause = exception;
724
725      // captures pipegraph for debugging
726      // forward message in case cause or trace is lost
727      String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) );
728
729      if( cause.getMessage() != null )
730        message = String.format( "%s: [%s]", message, cause.getMessage() );
731
732      if( cause instanceof OperatorException )
733        return new PlannerException( message, cause, flowElementGraph );
734
735      if( cause instanceof TapException )
736        return new PlannerException( message, cause, flowElementGraph );
737
738      return new PlannerException( ( (ElementGraphException) exception ).getPipe(), message, cause, flowElementGraph );
739      }
740    else
741      {
742      // captures pipegraph for debugging
743      // forward message in case cause or trace is lost
744      String message = String.format( "[%s] could not build flow from assembly", Util.truncate( flowDef.getName(), 25 ) );
745
746      if( exception.getMessage() != null )
747        message = String.format( "%s: [%s]", message, exception.getMessage() );
748
749      return new PlannerException( message, exception, flowElementGraph );
750      }
751    }
752
753  public class TempTapElementFactory extends IntermediateTapElementFactory
754    {
755    @Override
756    public FlowElement create( ElementGraph graph, FlowElement flowElement )
757      {
758      return makeTempTap( (FlowElementGraph) graph, (Pipe) flowElement );
759      }
760    }
761
762  private Tap makeTempTap( FlowElementGraph graph, Pipe pipe )
763    {
764    Tap checkpointTap = graph.getCheckpointsMap().get( pipe.getName() );
765
766    if( checkpointTap != null )
767      {
768      LOG.info( "found checkpoint: {}, using tap: {}", pipe.getName(), checkpointTap );
769      checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS );
770      }
771
772    if( checkpointTap == null )
773      {
774      // only restart from a checkpoint pipe or checkpoint tap below
775      if( pipe instanceof Checkpoint )
776        {
777        checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() );
778        checkpointTap = decorateTap( pipe, checkpointTap, FlowConnectorProps.CHECKPOINT_TAP_DECORATOR_CLASS );
779        // mark as an anonymous checkpoint
780        checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
781        }
782      else
783        {
784        checkpointTap = makeTempTap( pipe.getName() );
785        }
786      }
787
788    return decorateTap( pipe, checkpointTap, FlowConnectorProps.TEMPORARY_TAP_DECORATOR_CLASS );
789    }
790
791  private Tap decorateTap( Pipe pipe, Tap tempTap, String decoratorClass )
792    {
793    String decoratorClassName = PropertyUtil.getProperty( defaultProperties, pipe, decoratorClass );
794
795    if( Util.isEmpty( decoratorClassName ) )
796      return tempTap;
797
798    LOG.info( "found decorator: {}, wrapping tap: {}", decoratorClass, tempTap );
799
800    tempTap = Util.newInstance( decoratorClassName, tempTap );
801
802    return tempTap;
803    }
804
805  protected Tap makeTempTap( String name )
806    {
807    return makeTempTap( null, name );
808    }
809
810  protected DebugLevel getDebugLevel( FlowDef flowDef )
811    {
812    return flowDef.getDebugLevel() == null ? this.defaultDebugLevel : flowDef.getDebugLevel();
813    }
814
815  protected AssertionLevel getAssertionLevel( FlowDef flowDef )
816    {
817    return flowDef.getAssertionLevel() == null ? this.defaultAssertionLevel : flowDef.getAssertionLevel();
818    }
819
820  protected abstract Tap makeTempTap( String prefix, String name );
821
822  private Set<ProcessLevel> getReverseOrderedProcessLevels( RuleResult ruleResult )
823    {
824    Set<ProcessLevel> ordered = new TreeSet<>( Collections.reverseOrder() );
825
826    ordered.addAll( ruleResult.getRegistry().getProcessLevels() );
827
828    return ordered;
829    }
830  }