001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.rule;
023
024import java.util.ArrayList;
025import java.util.LinkedHashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.planner.PlannerContext;
033import cascading.flow.planner.PlannerException;
034import cascading.flow.planner.graph.AnnotatedGraph;
035import cascading.flow.planner.graph.BoundedElementMultiGraph;
036import cascading.flow.planner.graph.ElementGraph;
037import cascading.flow.planner.graph.ElementGraphs;
038import cascading.flow.planner.graph.ElementMultiGraph;
039import cascading.flow.planner.graph.FlowElementGraph;
040import cascading.flow.planner.graph.IgnoreAnnotationsHashSet;
041import cascading.flow.planner.iso.GraphResult;
042import cascading.flow.planner.iso.assertion.Asserted;
043import cascading.flow.planner.iso.assertion.GraphAssert;
044import cascading.flow.planner.iso.subgraph.Partitions;
045import cascading.flow.planner.iso.transformer.GraphTransformer;
046import cascading.flow.planner.iso.transformer.Transformed;
047import cascading.flow.planner.rule.util.TraceWriter;
048import cascading.util.EnumMultiMap;
049import cascading.util.ProcessLogger;
050
051import static cascading.util.Util.createIdentitySet;
052import static cascading.util.Util.formatDurationFromMillis;
053import static java.lang.String.format;
054
055/**
056 *
057 */
058public class RuleExec
059  {
060  private static final int ELEMENT_THRESHOLD = 600;
061
062  final TraceWriter traceWriter;
063  final RuleRegistry registry;
064
065  public RuleExec( TraceWriter traceWriter, RuleRegistry registry )
066    {
067    this.traceWriter = traceWriter;
068    this.registry = registry;
069    }
070
071  public RuleResult exec( PlannerContext plannerContext, FlowElementGraph flowElementGraph )
072    {
073    RuleResult ruleResult = new RuleResult( registry, flowElementGraph );
074
075    ProcessLogger logger = plannerContext.getLogger();
076    int size = flowElementGraph.vertexSet().size();
077    boolean logAsInfo = size >= ELEMENT_THRESHOLD;
078
079    if( logAsInfo )
080      logger.logInfo( "elements in graph: {}, info logging threshold: {}, logging planner execution status", size, ELEMENT_THRESHOLD );
081
082    long beginExec = System.currentTimeMillis();
083
084    try
085      {
086      planPhases( plannerContext, logAsInfo, ruleResult );
087      }
088    catch( Exception exception )
089      {
090      ruleResult.setPlannerException( exception );
091      }
092    finally
093      {
094      long endExec = System.currentTimeMillis();
095
096      ruleResult.setDuration( beginExec, endExec );
097
098      RuleResult.ResultStatus status = ruleResult.getResultStatus();
099      String duration = formatDurationFromMillis( endExec - beginExec );
100      logPhase( logger, logAsInfo, "rule registry completed: {}, with status: {}, and duration: {}", registry.getName(), status, duration );
101      }
102
103    return ruleResult;
104    }
105
106  protected void planPhases( PlannerContext plannerContext, boolean logAsInfo, RuleResult ruleResult )
107    {
108    ProcessLogger logger = plannerContext.getLogger();
109
110    for( PlanPhase phase : PlanPhase.values() ) // iterate in order, all planner phases
111      {
112      long beginPhase = System.currentTimeMillis();
113
114      logPhase( logger, logAsInfo, "starting rule phase: {}", phase );
115
116      try
117        {
118        switch( phase.getAction() )
119          {
120          case Resolve:
121            resolveElements( ruleResult );
122            break;
123
124          case Rule:
125            executeRulePhase( phase, plannerContext, ruleResult );
126            break;
127          }
128        }
129      finally
130        {
131        long endPhase = System.currentTimeMillis();
132
133        ruleResult.setPhaseDuration( phase, beginPhase, endPhase );
134
135        logPhase( logger, logAsInfo, "ending rule phase: {}, duration: {}", phase, formatDurationFromMillis( endPhase - beginPhase ) );
136        }
137      }
138    }
139
140  private void resolveElements( RuleResult ruleResult )
141    {
142    if( !registry.enabledResolveElements() )
143      return;
144
145    FlowElementGraph elementGraph = ruleResult.getAssemblyGraph();
146
147    elementGraph = (FlowElementGraph) elementGraph.copyElementGraph();
148
149    ScopeResolver.resolveFields( elementGraph );
150
151    ruleResult.setLevelResults( ProcessLevel.Assembly, ruleResult.initialAssembly, elementGraph );
152    }
153
154  public RuleResult executeRulePhase( PlanPhase phase, PlannerContext plannerContext, RuleResult ruleResult )
155    {
156    ProcessLogger logger = plannerContext.getLogger();
157
158    logger.logDebug( "executing plan phase: {}", phase );
159
160    LinkedList<Rule> rules = registry.getRulesFor( phase );
161
162    writePhaseInitPlan( phase, ruleResult );
163
164    try
165      {
166      // within this phase, execute all rules in declared order
167      for( Rule rule : rules )
168        {
169        logger.logDebug( "executing rule: {}", rule );
170
171        long begin = System.currentTimeMillis();
172
173        try
174          {
175          switch( phase.getMode() )
176            {
177            case Mutate:
178              performMutation( plannerContext, ruleResult, phase, rule );
179              break;
180
181            case Partition:
182              performPartition( plannerContext, ruleResult, phase, rule );
183              break;
184            }
185          }
186        catch( UnsupportedPlanException exception )
187          {
188          logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() );
189
190          throw new UnsupportedPlanException( rule, exception );
191          }
192        catch( PlannerException exception )
193          {
194          logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() );
195
196          throw exception; // rethrow
197          }
198        catch( Exception exception )
199          {
200          logger.logDebug( "executing rule failed: {}, message: {}", rule, exception.getMessage() );
201
202          throw new PlannerException( registry, phase, rule, exception );
203          }
204        finally
205          {
206          long end = System.currentTimeMillis();
207
208          ruleResult.setRuleDuration( rule, begin, end );
209
210          logger.logDebug( "completed rule: {}", rule );
211          }
212        }
213
214      return ruleResult;
215      }
216    finally
217      {
218      logger.logDebug( "completed plan phase: {}", phase );
219      writePhaseResultPlan( phase, ruleResult );
220      }
221    }
222
223  protected void performMutation( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, Rule rule )
224    {
225    if( rule instanceof GraphTransformer )
226      performTransform( plannerContext, ruleResult, phase, (GraphTransformer) rule );
227    else if( rule instanceof GraphAssert )
228      performAssertion( plannerContext, ruleResult, phase, (GraphAssert) rule );
229    else
230      throw new PlannerException( "unexpected rule: " + rule.getRuleName() );
231    }
232
233  private void performPartition( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, Rule rule )
234    {
235    if( !( rule instanceof RulePartitioner ) )
236      throw new PlannerException( "unexpected rule: " + rule.getRuleName() );
237
238    RulePartitioner partitioner = (RulePartitioner) rule;
239
240    if( partitioner.getPartitionSource() == RulePartitioner.PartitionSource.PartitionParent )
241      handleParentPartitioning( plannerContext, ruleResult, phase, partitioner );
242    else if( partitioner.getPartitionSource() == RulePartitioner.PartitionSource.PartitionCurrent )
243      handleCurrentPartitioning( plannerContext, ruleResult, phase, partitioner );
244    else
245      throw new IllegalStateException( "unknown partitioning type: " + partitioner.getPartitionSource() );
246    }
247
248  private void handleCurrentPartitioning( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, RulePartitioner partitioner )
249    {
250    Map<ElementGraph, List<? extends ElementGraph>> priorResults = ruleResult.getLevelResults( phase.getLevel() );
251
252    Map<ElementGraph, List<? extends ElementGraph>> subGraphs = new LinkedHashMap<>();
253
254    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : priorResults.entrySet() )
255      {
256      ElementGraph parent = entry.getKey();
257      List<? extends ElementGraph> priors = entry.getValue();
258
259      List<ElementGraph> resultChildren = new ArrayList<>( priors );
260
261      Set<FlowElement> exclusions = getExclusions( priors, partitioner.getAnnotationExcludes() );
262
263      for( ElementGraph child : priors )
264        {
265        ElementGraph priorAnnotated = annotateWithPriors( child, priors );
266
267        Partitions partitions;
268
269        try
270          {
271          partitions = partitioner.partition( plannerContext, priorAnnotated, exclusions );
272          }
273        catch( Throwable throwable )
274          {
275          throw new PlannerException( registry, phase, partitioner, priorAnnotated, throwable );
276          }
277
278        writeTransformTrace( ruleResult, phase, partitioner, parent, child, partitions );
279
280        List<ElementGraph> results = makeBoundedOn( ruleResult.getAssemblyGraph(), partitions.getAnnotatedSubGraphs() );
281
282        if( results.isEmpty() )
283          continue;
284
285        // ignore annotations on equality, but replace an newer graph with prior
286        IgnoreAnnotationsHashSet uniques = new IgnoreAnnotationsHashSet( results );
287
288        if( uniques.size() != results.size() )
289          throw new PlannerException( "rule created duplicate element graphs" );
290
291        // replace child with partitioned results
292        resultChildren.remove( child );
293
294        for( ElementGraph prior : resultChildren )
295          {
296          if( !uniques.add( prior ) ) // todo: setting to force failure on duplicates
297            plannerContext.getLogger().logDebug( "re-partition rule created duplicate element graph to prior partitioner: {}, replacing duplicate result", partitioner.getRuleName() );
298          }
299
300        // order no longer preserved
301        resultChildren = uniques.asList();
302        }
303
304      subGraphs.put( parent, resultChildren );
305      }
306
307    ruleResult.setLevelResults( phase.getLevel(), subGraphs );
308    }
309
310  private void handleParentPartitioning( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, RulePartitioner partitioner )
311    {
312    Map<ElementGraph, List<? extends ElementGraph>> priorResults = ruleResult.getLevelResults( phase.getLevel() );
313
314    Map<ElementGraph, List<? extends ElementGraph>> subGraphs = new LinkedHashMap<>();
315
316    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : priorResults.entrySet() )
317      {
318      ElementGraph parent = entry.getKey();
319      List<? extends ElementGraph> priors = entry.getValue();
320
321      Set<FlowElement> exclusions = getExclusions( priors, partitioner.getAnnotationExcludes() );
322      ElementGraph priorAnnotated = annotateWithPriors( parent, priors );
323
324      Partitions partitions;
325
326      try
327        {
328        partitions = partitioner.partition( plannerContext, priorAnnotated, exclusions );
329        }
330      catch( Throwable throwable )
331        {
332        throw new PlannerException( registry, phase, partitioner, priorAnnotated, throwable );
333        }
334
335      writeTransformTrace( ruleResult, phase, partitioner, parent, null, partitions );
336
337      List<ElementGraph> results = makeBoundedOn( ruleResult.getAssemblyGraph(), partitions.getAnnotatedSubGraphs() );
338
339      // ignore annotations on equality, but replace an newer graph with prior
340      IgnoreAnnotationsHashSet uniques = new IgnoreAnnotationsHashSet( results );
341
342      if( uniques.size() != results.size() )
343        throw new PlannerException( "rule created duplicate element graphs" );
344
345      for( ElementGraph prior : priors )
346        {
347        if( !uniques.add( prior ) ) // todo: setting to force failure on duplicates
348          plannerContext.getLogger().logDebug( "partition rule created duplicate element graph to prior partitioner: {}, replacing duplicate result", partitioner.getRuleName() );
349        }
350
351      // order no longer preserved
352      subGraphs.put( parent, uniques.asList() );
353      }
354
355    ruleResult.setLevelResults( phase.getLevel(), subGraphs );
356    }
357
358  private void performAssertion( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, GraphAssert asserter )
359    {
360    plannerContext.getLogger().logDebug( "applying assertion: {}", ( (Rule) asserter ).getRuleName() );
361
362    Map<ElementGraph, List<? extends ElementGraph>> levelResults = ruleResult.getLevelResults( phase.getLevel() );
363
364    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : levelResults.entrySet() )
365      {
366      ElementGraph parent = entry.getKey(); // null for root case
367      List<? extends ElementGraph> children = entry.getValue();
368
369      for( ElementGraph child : children )
370        {
371        Asserted asserted;
372
373        try
374          {
375          asserted = asserter.assertion( plannerContext, child );
376          }
377        catch( Throwable throwable )
378          {
379          throw new PlannerException( registry, phase, (Rule) asserter, child, throwable );
380          }
381
382        writeTransformTrace( ruleResult, phase, (Rule) asserter, parent, child, asserted );
383
384        FlowElement primary = asserted.getFirstAnchor();
385
386        if( primary == null )
387          continue;
388
389        if( asserted.getAssertionType() == GraphAssert.AssertionType.Unsupported )
390          throw new UnsupportedPlanException( asserted.getFirstAnchor(), asserted.getMessage() );
391        else // only two options
392          throw new PlannerException( asserted.getFirstAnchor(), asserted.getMessage() );
393        }
394      }
395    }
396
397  private void performTransform( PlannerContext plannerContext, RuleResult ruleResult, PlanPhase phase, GraphTransformer transformer )
398    {
399    plannerContext.getLogger().logDebug( "applying transform: {}", ( (Rule) transformer ).getRuleName() );
400
401    Map<ElementGraph, List<? extends ElementGraph>> levelResults = ruleResult.getLevelResults( phase.getLevel() );
402
403    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : levelResults.entrySet() )
404      {
405      ElementGraph parent = entry.getKey(); // null for root case
406      List<? extends ElementGraph> children = entry.getValue();
407
408      List<ElementGraph> results = new ArrayList<>();
409
410      for( ElementGraph child : children )
411        {
412        Transformed transformed;
413
414        try
415          {
416          transformed = transformer.transform( plannerContext, child );
417          }
418        catch( TransformException exception )
419          {
420          writeTransformTrace( ruleResult, phase, (Rule) transformer, parent, child, exception.getTransformed() );
421
422          throw new PlannerException( registry, phase, (Rule) transformer, child, exception.getCause() );
423          }
424        catch( Throwable throwable )
425          {
426          throw new PlannerException( registry, phase, (Rule) transformer, child, throwable );
427          }
428
429        writeTransformTrace( ruleResult, phase, (Rule) transformer, parent, child, transformed );
430
431        ElementGraph endGraph = transformed.getEndGraph();
432
433        if( endGraph == null )
434          results.add( child );
435        else if( !ElementGraphs.isEmpty( endGraph ) )
436          results.add( endGraph );
437        }
438
439      ruleResult.setLevelResults( phase.getLevel(), parent, results );
440      }
441    }
442
443  private ElementGraph annotateWithPriors( ElementGraph elementGraph, List<? extends ElementGraph> priorResults )
444    {
445    if( priorResults == null )
446      return elementGraph;
447
448    // the results are sub-graphs of the elementGraph, so guaranteed to exist in graph
449    AnnotatedGraph resultGraph = new ElementMultiGraph( elementGraph );
450
451    for( ElementGraph result : priorResults )
452      {
453      if( !( result instanceof AnnotatedGraph ) || !( (AnnotatedGraph) result ).hasAnnotations() )
454        continue;
455
456      EnumMultiMap<FlowElement> annotations = ( (AnnotatedGraph) result ).getAnnotations();
457
458      resultGraph.getAnnotations().addAll( annotations );
459      }
460
461    return (ElementGraph) resultGraph;
462    }
463
464  private Set<FlowElement> getExclusions( List<? extends ElementGraph> elementGraphs, Enum[] annotationExcludes )
465    {
466    if( elementGraphs == null )
467      return null;
468
469    Set<FlowElement> exclusions = createIdentitySet();
470
471    for( ElementGraph elementGraph : elementGraphs )
472      {
473      if( !( elementGraph instanceof AnnotatedGraph ) || !( (AnnotatedGraph) elementGraph ).hasAnnotations() )
474        continue;
475
476      for( Enum annotationExclude : annotationExcludes )
477        {
478        Set<FlowElement> flowElements = ( (AnnotatedGraph) elementGraph ).getAnnotations().getValues( annotationExclude );
479
480        if( flowElements != null )
481          exclusions.addAll( flowElements );
482        }
483      }
484
485    return exclusions;
486    }
487
488  // use the final assembly graph so we can get Scopes for heads and tails
489  private List<ElementGraph> makeBoundedOn( ElementGraph currentElementGraph, Map<ElementGraph, EnumMultiMap> subGraphs )
490    {
491    List<ElementGraph> results = new ArrayList<>( subGraphs.size() );
492
493    for( ElementGraph subGraph : subGraphs.keySet() )
494      results.add( new BoundedElementMultiGraph( currentElementGraph, subGraph, subGraphs.get( subGraph ) ) );
495
496    return results;
497    }
498
499  private void writePhaseInitPlan( PlanPhase phase, RuleResult ruleResult )
500    {
501    switch( phase.getLevel() )
502      {
503      case Assembly:
504        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyGraph(), format( "%02d-%s-init.dot", phase.ordinal(), phase ) );
505        break;
506      case Step:
507        break;
508      case Node:
509        break;
510      case Pipeline:
511        break;
512      }
513    }
514
515  private void writePhaseResultPlan( PlanPhase phase, RuleResult ruleResult )
516    {
517    switch( phase.getLevel() )
518      {
519      case Assembly:
520        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyGraph(), format( "%02d-%s-result.dot", phase.ordinal(), phase ) );
521        break;
522      case Step:
523        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getAssemblyToStepGraphMap().get( ruleResult.getAssemblyGraph() ), phase, "result" );
524        break;
525      case Node:
526        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getStepToNodeGraphMap(), phase, "result" );
527        break;
528      case Pipeline:
529        traceWriter.writeTransformPlan( registry.getName(), ruleResult.getStepToNodeGraphMap(), ruleResult.getNodeToPipelineGraphMap(), phase, "result" );
530        break;
531      }
532    }
533
534  private void logPhase( ProcessLogger logger, boolean logAsInfo, String message, Object... items )
535    {
536    if( logAsInfo )
537      logger.logInfo( message, items );
538    else
539      logger.logDebug( message, items );
540    }
541
542  private void writeTransformTrace( RuleResult ruleResult, PlanPhase phase, Rule rule, ElementGraph parent, ElementGraph child, GraphResult result )
543    {
544    if( traceWriter.isTransformTraceDisabled() )
545      return;
546
547    int[] path = child != null ? ruleResult.getPathFor( parent, child ) : ruleResult.getPathFor( parent );
548
549    traceWriter.writeTransformPlan( registry.getName(), phase, rule, path, result );
550    }
551  }