001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.iso.transformer; 023 024import java.util.Collections; 025import java.util.Set; 026 027import cascading.flow.FlowElement; 028import cascading.flow.planner.PlannerContext; 029import cascading.flow.planner.graph.ElementGraph; 030import cascading.flow.planner.iso.expression.ExpressionGraph; 031import cascading.flow.planner.iso.finder.GraphFinder; 032import cascading.flow.planner.iso.finder.Match; 033import cascading.flow.planner.rule.TransformException; 034import cascading.util.ProcessLogger; 035 036/** 037 * 038 */ 039public abstract class RecursiveGraphTransformer<E extends ElementGraph> extends GraphTransformer<E, E> 040 { 041 /** 042 * Graphs must be transformed iteratively. In order to offset rules that may cause a loop, the recursion depth 043 * must be tracked. 044 * <p/> 045 * Some complex graphs may require a very deep search, so this value may need to be increased. 046 * <p/> 047 * During debugging, the depth may need to be shallow so that the trace logs can be written to disk before an 048 * OOME causes the planner to exit. 049 * <p/> 050 * This property may be either set at the planner ){@link cascading.flow.FlowConnector} or system level 051 * {@link System#getProperties()}. 052 */ 053 public static final String TRANSFORM_RECURSION_DEPTH_MAX = "cascading.planner.transformer.recursion.depth.max"; 054 public static final int DEFAULT_TRANSFORM_RECURSION_DEPTH_MAX = 1000; 055 056 private final GraphFinder finder; 057 private final ExpressionGraph expressionGraph; 058 private final boolean findAllPrimaries; 059 060 protected RecursiveGraphTransformer( ExpressionGraph expressionGraph ) 061 { 062 this.expressionGraph = expressionGraph; 063 this.finder = new GraphFinder( expressionGraph ); 064 this.findAllPrimaries = expressionGraph.supportsNonRecursiveMatch(); 065 } 066 067 @Override 068 public Transformed<E> transform( PlannerContext plannerContext, E rootGraph ) 069 { 070 int maxDepth = plannerContext.getIntProperty( TRANSFORM_RECURSION_DEPTH_MAX, DEFAULT_TRANSFORM_RECURSION_DEPTH_MAX ); 071 072 Transformed<E> transformed = new Transformed<>( plannerContext, this, expressionGraph, rootGraph ); 073 074 E result = transform( plannerContext.getLogger(), transformed, rootGraph, maxDepth, 0 ); 075 076 transformed.setEndGraph( result ); 077 078 return transformed; 079 } 080 081 protected E transform( ProcessLogger processLogger, Transformed<E> transformed, E graph, int maxDepth, int currentDepth ) 082 { 083 if( currentDepth == maxDepth ) 084 { 085 processLogger.logInfo( "!!! transform recursion ending, reached depth: {}", currentDepth ); 086 return graph; 087 } 088 089 if( processLogger.isDebugEnabled() ) 090 processLogger.logDebug( "preparing match within: {}", this.getClass().getSimpleName() ); 091 092 ElementGraph prepared = prepareForMatch( processLogger, transformed, graph ); 093 094 if( processLogger.isDebugEnabled() ) 095 processLogger.logDebug( "completed match within: {}, with result: {}", this.getClass().getSimpleName(), prepared != null ); 096 097 if( prepared == null ) 098 return graph; 099 100 Set<FlowElement> exclusions = addExclusions( graph ); 101 102 Match match; 103 104 if( processLogger.isDebugEnabled() ) 105 processLogger.logDebug( "performing match within: {}, using recursion: {}", this.getClass().getSimpleName(), !findAllPrimaries ); 106 107 // for trivial cases, disable recursion and capture all primaries initially 108 // if prepareForMatch returns a sub-graph, find all matches in the sub-graph, but we do not exit the recursion 109 if( findAllPrimaries ) 110 match = finder.findAllMatches( transformed.getPlannerContext(), prepared, exclusions ); 111 else 112 match = finder.findFirstMatch( transformed.getPlannerContext(), prepared, exclusions ); 113 114 if( processLogger.isDebugEnabled() ) 115 processLogger.logDebug( "completed match within: {}", this.getClass().getSimpleName() ); 116 117 if( processLogger.isDebugEnabled() ) 118 processLogger.logDebug( "performing transform in place within: {}", this.getClass().getSimpleName() ); 119 120 boolean transformResult = transformGraphInPlaceUsingSafe( transformed, graph, match ); 121 122 if( processLogger.isDebugEnabled() ) 123 processLogger.logDebug( "completed transform in place within: {}, with result: {}", this.getClass().getSimpleName(), transformResult ); 124 125 if( !transformResult ) 126 return graph; 127 128 transformed.addRecursionTransform( graph ); 129 130 if( !requiresRecursiveSearch() ) 131 return graph; 132 133 return transform( processLogger, transformed, graph, maxDepth, ++currentDepth ); 134 } 135 136 private boolean transformGraphInPlaceUsingSafe( Transformed<E> transformed, E graph, Match match ) 137 { 138 try 139 { 140 return transformGraphInPlaceUsing( transformed, graph, match ); 141 } 142 catch( TransformException exception ) 143 { 144 throw exception; 145 } 146 catch( Throwable throwable ) 147 { 148 throw new TransformException( throwable, transformed ); 149 } 150 } 151 152 /** 153 * By default, prepareForMatch returns the same graph, but sub-classes may return a sub-graph, one of many 154 * requiring sub-sequent matches. 155 * <p/> 156 * if we are searching the whole graph, there is no need to perform a recursion against the new transformed graph 157 */ 158 protected boolean requiresRecursiveSearch() 159 { 160 return !findAllPrimaries; 161 } 162 163 protected Set<FlowElement> addExclusions( E graph ) 164 { 165 return Collections.emptySet(); 166 } 167 168 protected ElementGraph prepareForMatch( ProcessLogger processLogger, Transformed<E> transformed, E graph ) 169 { 170 return graph; 171 } 172 173 protected abstract boolean transformGraphInPlaceUsing( Transformed<E> transformed, E graph, Match match ); 174 }