001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.iso.transformer;
023
024import java.util.Collections;
025import java.util.Set;
026
027import cascading.flow.FlowElement;
028import cascading.flow.planner.PlannerContext;
029import cascading.flow.planner.graph.ElementGraph;
030import cascading.flow.planner.iso.expression.ExpressionGraph;
031import cascading.flow.planner.iso.finder.GraphFinder;
032import cascading.flow.planner.iso.finder.Match;
033import cascading.flow.planner.rule.TransformException;
034import cascading.util.ProcessLogger;
035
036/**
037 *
038 */
039public abstract class RecursiveGraphTransformer<E extends ElementGraph> extends GraphTransformer<E, E>
040  {
041  /**
042   * Graphs must be transformed iteratively. In order to offset rules that may cause a loop, the recursion depth
043   * must be tracked.
044   * <p/>
045   * Some complex graphs may require a very deep search, so this value may need to be increased.
046   * <p/>
047   * During debugging, the depth may need to be shallow so that the trace logs can be written to disk before an
048   * OOME causes the planner to exit.
049   * <p/>
050   * This property may be either set at the planner ){@link cascading.flow.FlowConnector} or system level
051   * {@link System#getProperties()}.
052   */
053  public static final String TRANSFORM_RECURSION_DEPTH_MAX = "cascading.planner.transformer.recursion.depth.max";
054  public static final int DEFAULT_TRANSFORM_RECURSION_DEPTH_MAX = 1000;
055
056  private final GraphFinder finder;
057  private final ExpressionGraph expressionGraph;
058  private final boolean findAllPrimaries;
059
060  protected RecursiveGraphTransformer( ExpressionGraph expressionGraph )
061    {
062    this.expressionGraph = expressionGraph;
063    this.finder = new GraphFinder( expressionGraph );
064    this.findAllPrimaries = expressionGraph.supportsNonRecursiveMatch();
065    }
066
067  @Override
068  public Transformed<E> transform( PlannerContext plannerContext, E rootGraph )
069    {
070    int maxDepth = plannerContext.getIntProperty( TRANSFORM_RECURSION_DEPTH_MAX, DEFAULT_TRANSFORM_RECURSION_DEPTH_MAX );
071
072    Transformed<E> transformed = new Transformed<>( plannerContext, this, expressionGraph, rootGraph );
073
074    E result = transform( plannerContext.getLogger(), transformed, rootGraph, maxDepth, 0 );
075
076    transformed.setEndGraph( result );
077
078    return transformed;
079    }
080
081  protected E transform( ProcessLogger processLogger, Transformed<E> transformed, E graph, int maxDepth, int currentDepth )
082    {
083    if( currentDepth == maxDepth )
084      {
085      processLogger.logInfo( "!!! transform recursion ending, reached depth: {}", currentDepth );
086      return graph;
087      }
088
089    if( processLogger.isDebugEnabled() )
090      processLogger.logDebug( "preparing match within: {}", this.getClass().getSimpleName() );
091
092    ElementGraph prepared = prepareForMatch( processLogger, transformed, graph );
093
094    if( processLogger.isDebugEnabled() )
095      processLogger.logDebug( "completed match within: {}, with result: {}", this.getClass().getSimpleName(), prepared != null );
096
097    if( prepared == null )
098      return graph;
099
100    Set<FlowElement> exclusions = addExclusions( graph );
101
102    Match match;
103
104    if( processLogger.isDebugEnabled() )
105      processLogger.logDebug( "performing match within: {}, using recursion: {}", this.getClass().getSimpleName(), !findAllPrimaries );
106
107    // for trivial cases, disable recursion and capture all primaries initially
108    // if prepareForMatch returns a sub-graph, find all matches in the sub-graph, but we do not exit the recursion
109    if( findAllPrimaries )
110      match = finder.findAllMatches( transformed.getPlannerContext(), prepared, exclusions );
111    else
112      match = finder.findFirstMatch( transformed.getPlannerContext(), prepared, exclusions );
113
114    if( processLogger.isDebugEnabled() )
115      processLogger.logDebug( "completed match within: {}", this.getClass().getSimpleName() );
116
117    if( processLogger.isDebugEnabled() )
118      processLogger.logDebug( "performing transform in place within: {}", this.getClass().getSimpleName() );
119
120    boolean transformResult = transformGraphInPlaceUsingSafe( transformed, graph, match );
121
122    if( processLogger.isDebugEnabled() )
123      processLogger.logDebug( "completed transform in place within: {}, with result: {}", this.getClass().getSimpleName(), transformResult );
124
125    if( !transformResult )
126      return graph;
127
128    transformed.addRecursionTransform( graph );
129
130    if( !requiresRecursiveSearch() )
131      return graph;
132
133    return transform( processLogger, transformed, graph, maxDepth, ++currentDepth );
134    }
135
136  private boolean transformGraphInPlaceUsingSafe( Transformed<E> transformed, E graph, Match match )
137    {
138    try
139      {
140      return transformGraphInPlaceUsing( transformed, graph, match );
141      }
142    catch( TransformException exception )
143      {
144      throw exception;
145      }
146    catch( Throwable throwable )
147      {
148      throw new TransformException( throwable, transformed );
149      }
150    }
151
152  /**
153   * By default, prepareForMatch returns the same graph, but sub-classes may return a sub-graph, one of many
154   * requiring sub-sequent matches.
155   * <p/>
156   * if we are searching the whole graph, there is no need to perform a recursion against the new transformed graph
157   */
158  protected boolean requiresRecursiveSearch()
159    {
160    return !findAllPrimaries;
161    }
162
163  protected Set<FlowElement> addExclusions( E graph )
164    {
165    return Collections.emptySet();
166    }
167
168  protected ElementGraph prepareForMatch( ProcessLogger processLogger, Transformed<E> transformed, E graph )
169    {
170    return graph;
171    }
172
173  protected abstract boolean transformGraphInPlaceUsing( Transformed<E> transformed, E graph, Match match );
174  }