001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.iso.transformer; 022 023import java.util.Collections; 024import java.util.Set; 025 026import cascading.flow.FlowElement; 027import cascading.flow.planner.PlannerContext; 028import cascading.flow.planner.graph.ElementGraph; 029import cascading.flow.planner.iso.expression.ExpressionGraph; 030import cascading.flow.planner.iso.finder.GraphFinder; 031import cascading.flow.planner.iso.finder.Match; 032import cascading.util.ProcessLogger; 033 034/** 035 * 036 */ 037public abstract class RecursiveGraphTransformer<E extends ElementGraph> extends GraphTransformer<E, E> 038 { 039 /** 040 * Graphs must be transformed iteratively. In order to offset rules that may cause a loop, the recursion depth 041 * must be tracked. 042 * <p/> 043 * Some complex graphs may require a very deep search, so this value may need to be increased. 044 * <p/> 045 * During debugging, the depth may need to be shallow so that the trace logs can be written to disk before an 046 * OOME causes the planner to exit. 047 * <p/> 048 * This property may be either set at the planner ){@link cascading.flow.FlowConnector} or system level 049 * {@link System#getProperties()}. 050 */ 051 public static final String TRANSFORM_RECURSION_DEPTH_MAX = "cascading.planner.transformer.recursion.depth.max"; 052 public static final int DEFAULT_TRANSFORM_RECURSION_DEPTH_MAX = 1000; 053 054 private final GraphFinder finder; 055 private final ExpressionGraph expressionGraph; 056 private final boolean findAllPrimaries; 057 058 protected RecursiveGraphTransformer( ExpressionGraph expressionGraph ) 059 { 060 this.expressionGraph = expressionGraph; 061 this.finder = new GraphFinder( expressionGraph ); 062 this.findAllPrimaries = expressionGraph.supportsNonRecursiveMatch(); 063 } 064 065 @Override 066 public Transformed<E> transform( PlannerContext plannerContext, E rootGraph ) 067 { 068 int maxDepth = plannerContext.getIntProperty( TRANSFORM_RECURSION_DEPTH_MAX, DEFAULT_TRANSFORM_RECURSION_DEPTH_MAX ); 069 070 Transformed<E> transformed = new Transformed<>( plannerContext, this, expressionGraph, rootGraph ); 071 072 E result = transform( plannerContext.getLogger(), transformed, rootGraph, maxDepth, 0 ); 073 074 transformed.setEndGraph( result ); 075 076 return transformed; 077 } 078 079 protected E transform( ProcessLogger processLogger, Transformed<E> transformed, E graph, int maxDepth, int currentDepth ) 080 { 081 if( currentDepth == maxDepth ) 082 { 083 processLogger.logInfo( "!!! transform recursion ending, reached depth: {}", currentDepth ); 084 return graph; 085 } 086 087 if( processLogger.isDebugEnabled() ) 088 processLogger.logDebug( "preparing match within: {}", this.getClass().getSimpleName() ); 089 090 ElementGraph prepared = prepareForMatch( processLogger, transformed, graph ); 091 092 if( processLogger.isDebugEnabled() ) 093 processLogger.logDebug( "completed match within: {}, with result: {}", this.getClass().getSimpleName(), prepared != null ); 094 095 if( prepared == null ) 096 return graph; 097 098 Set<FlowElement> exclusions = addExclusions( graph ); 099 100 Match match; 101 102 if( processLogger.isDebugEnabled() ) 103 processLogger.logDebug( "performing match within: {}, using recursion: {}", this.getClass().getSimpleName(), !findAllPrimaries ); 104 105 // for trivial cases, disable recursion and capture all primaries initially 106 // if prepareForMatch returns a sub-graph, find all matches in the sub-graph, but we do not exit the recursion 107 if( findAllPrimaries ) 108 match = finder.findAllMatches( transformed.getPlannerContext(), prepared, exclusions ); 109 else 110 match = finder.findFirstMatch( transformed.getPlannerContext(), prepared, exclusions ); 111 112 if( processLogger.isDebugEnabled() ) 113 processLogger.logDebug( "completed match within: {}", this.getClass().getSimpleName() ); 114 115 if( processLogger.isDebugEnabled() ) 116 processLogger.logDebug( "performing transform in place within: {}", this.getClass().getSimpleName() ); 117 118 boolean transformResult = transformGraphInPlaceUsing( transformed, graph, match ); 119 120 if( processLogger.isDebugEnabled() ) 121 processLogger.logDebug( "completed transform in place within: {}, with result: {}", this.getClass().getSimpleName(), transformResult ); 122 123 if( !transformResult ) 124 return graph; 125 126 transformed.addRecursionTransform( graph ); 127 128 if( !requiresRecursiveSearch() ) 129 return graph; 130 131 return transform( processLogger, transformed, graph, maxDepth, ++currentDepth ); 132 } 133 134 /** 135 * By default, prepareForMatch returns the same graph, but sub-classes may return a sub-graph, one of many 136 * requiring sub-sequent matches. 137 * <p/> 138 * if we are searching the whole graph, there is no need to perform a recursion against the new transformed graph 139 */ 140 protected boolean requiresRecursiveSearch() 141 { 142 return !findAllPrimaries; 143 } 144 145 protected Set<FlowElement> addExclusions( E graph ) 146 { 147 return Collections.emptySet(); 148 } 149 150 protected ElementGraph prepareForMatch( ProcessLogger processLogger, Transformed<E> transformed, E graph ) 151 { 152 return graph; 153 } 154 155 protected abstract boolean transformGraphInPlaceUsing( Transformed<E> transformed, E graph, Match match ); 156 }