001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.rule; 022 023import java.io.PrintWriter; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.LinkedHashMap; 028import java.util.LinkedHashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032 033import cascading.flow.planner.graph.ElementGraph; 034import cascading.flow.planner.graph.FlowElementGraph; 035import cascading.flow.planner.rule.util.ResultTree; 036import cascading.util.Util; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import static cascading.util.Util.formatDurationFromMillis; 041 042/** 043 * 044 */ 045public class RuleResult 046 { 047 public static final int THRESHOLD_SECONDS = 10; 048 049 private static final Logger LOG = LoggerFactory.getLogger( RuleResult.class ); 050 051 public enum ResultStatus 052 { 053 SUCCESS, 054 UNSUPPORTED, 055 ILLEGAL, 056 INTERRUPTED 057 } 058 059 private Map<ProcessLevel, Set<ElementGraph>> levelParents = new HashMap<>(); 060 private ResultTree resultTree = new ResultTree(); 061 062 private long duration = 0; 063 private Map<PlanPhase, Long> phaseDurations = new LinkedHashMap<>(); 064 private Map<PlanPhase, Map<String, Long>> ruleDurations = new LinkedHashMap<>(); 065 066 protected FlowElementGraph initialAssembly; 067 private RuleRegistry registry; 068 private Exception plannerException; 069 070 public RuleResult() 071 { 072 for( ProcessLevel level : ProcessLevel.values() ) 073 levelParents.put( level, new LinkedHashSet<ElementGraph>() ); 074 } 075 076 public RuleResult( RuleRegistry registry ) 077 { 078 this(); 079 080 this.registry = registry; 081 } 082 083 public RuleResult( FlowElementGraph initialAssembly ) 084 { 085 this(); 086 087 initResult( initialAssembly ); 088 } 089 090 public RuleResult( RuleRegistry registry, FlowElementGraph initialAssembly ) 091 { 092 this(); 093 this.registry = registry; 094 095 initResult( initialAssembly ); 096 } 097 098 public RuleRegistry getRegistry() 099 { 100 return registry; 101 } 102 103 public void setPlannerException( Exception plannerException ) 104 { 105 this.plannerException = plannerException; 106 } 107 108 public Exception getPlannerException() 109 { 110 return plannerException; 111 } 112 113 public boolean hasPlannerException() 114 { 115 return plannerException != null; 116 } 117 118 public boolean isSuccess() 119 { 120 return !hasPlannerException(); 121 } 122 123 public boolean isIllegal() 124 { 125 return !isSuccess() && !isUnsupported() && !isInterrupted(); 126 } 127 128 public boolean isUnsupported() 129 { 130 return getPlannerException() instanceof UnsupportedPlanException; 131 } 132 133 public boolean isInterrupted() 134 { 135 return getPlannerException() instanceof InterruptedException; 136 } 137 138 public ResultStatus getResultStatus() 139 { 140 if( isSuccess() ) 141 return ResultStatus.SUCCESS; 142 143 if( isUnsupported() ) 144 return ResultStatus.UNSUPPORTED; 145 146 if( isInterrupted() ) 147 return ResultStatus.INTERRUPTED; 148 149 return ResultStatus.ILLEGAL; 150 } 151 152 public void initResult( FlowElementGraph initialAssembly ) 153 { 154 this.initialAssembly = initialAssembly; 155 156 setLevelResults( ProcessLevel.Assembly, initialAssembly, initialAssembly.copyElementGraph() ); 157 } 158 159 public void setLevelResults( ProcessLevel level, Map<ElementGraph, List<? extends ElementGraph>> results ) 160 { 161 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : results.entrySet() ) 162 setLevelResults( level, entry.getKey(), entry.getValue() ); 163 } 164 165 public void setLevelResults( ProcessLevel level, ElementGraph parent, ElementGraph child ) 166 { 167 setLevelResults( level, parent, Collections.singletonList( child ) ); 168 } 169 170 public void setLevelResults( ProcessLevel level, ElementGraph parent, List<? extends ElementGraph> elementGraphs ) 171 { 172 levelParents.get( level ).add( parent ); 173 resultTree.setChildren( parent, elementGraphs ); 174 } 175 176 public Map<ElementGraph, List<? extends ElementGraph>> getLevelResults( ProcessLevel level ) 177 { 178 Map<ElementGraph, List<? extends ElementGraph>> result = new HashMap<>(); 179 Set<? extends ElementGraph> parents = levelParents.get( level ); 180 181 if( !parents.isEmpty() ) 182 { 183 for( ElementGraph parent : parents ) 184 result.put( parent, resultTree.getChildren( parent ) ); 185 186 return result; 187 } 188 189 // initialize the level 190 Set<ElementGraph> top = levelParents.get( ProcessLevel.parent( level ) ); 191 192 for( ElementGraph parent : top ) 193 { 194 List<? extends ElementGraph> children = resultTree.getChildren( parent ); 195 196 for( ElementGraph child : children ) 197 result.put( child, new ArrayList<ElementGraph>() ); 198 } 199 200 return result; 201 } 202 203 public int[] getPathFor( ElementGraph parent, ElementGraph child ) 204 { 205 return resultTree.getEdge( parent, child ).getOrdinals(); 206 } 207 208 public int[] getPathFor( ElementGraph parent ) 209 { 210 ResultTree.Path incomingEdge = resultTree.getIncomingEdge( parent ); 211 212 if( incomingEdge == null ) 213 return new int[ 0 ]; 214 215 return incomingEdge.getOrdinals(); 216 } 217 218 public FlowElementGraph getInitialAssembly() 219 { 220 return initialAssembly; 221 } 222 223 public FlowElementGraph getAssemblyGraph() 224 { 225 Map<ElementGraph, List<? extends ElementGraph>> results = getLevelResults( ProcessLevel.Assembly ); 226 227 return (FlowElementGraph) Util.getFirst( results.get( getInitialAssembly() ) ); 228 } 229 230 public Map<ElementGraph, List<? extends ElementGraph>> getAssemblyToStepGraphMap() 231 { 232 return getLevelResults( ProcessLevel.Step ); 233 } 234 235 public Map<ElementGraph, List<? extends ElementGraph>> getStepToNodeGraphMap() 236 { 237 return getLevelResults( ProcessLevel.Node ); 238 } 239 240 public Map<ElementGraph, List<? extends ElementGraph>> getNodeToPipelineGraphMap() 241 { 242 return getLevelResults( ProcessLevel.Pipeline ); 243 } 244 245 public int getNumSteps() 246 { 247 return getStepToNodeGraphMap().size(); 248 } 249 250 public int getNumNodes() 251 { 252 int nodes = 0; 253 254 for( List<? extends ElementGraph> nodesList : getStepToNodeGraphMap().values() ) 255 nodes += nodesList.size(); 256 257 return nodes; 258 } 259 260 public void setDuration( long begin, long end ) 261 { 262 duration = end - begin; 263 } 264 265 public long getDuration() 266 { 267 return duration; 268 } 269 270 public void setPhaseDuration( PlanPhase phase, long begin, long end ) 271 { 272 phaseDurations.put( phase, end - begin ); 273 } 274 275 public void setRuleDuration( Rule rule, long begin, long end ) 276 { 277 Map<String, Long> durations = ruleDurations.get( rule.getRulePhase() ); 278 279 if( durations == null ) 280 { 281 durations = new LinkedHashMap<>(); 282 ruleDurations.put( rule.getRulePhase(), durations ); 283 } 284 285 if( durations.containsKey( rule.getRuleName() ) ) 286 throw new IllegalStateException( "duplicate rule found: " + rule.getRuleName() ); 287 288 long duration = end - begin; 289 290 // print these as we go 291 if( duration > THRESHOLD_SECONDS * 1000 ) 292 LOG.info( "rule: {}, took longer than {} seconds: {}", rule.getRuleName(), THRESHOLD_SECONDS, formatDurationFromMillis( duration ) ); 293 294 durations.put( rule.getRuleName(), duration ); 295 } 296 297 public void writeStats( PrintWriter writer ) 298 { 299 writer.format( "duration\t%.03f\n", ( duration / 1000f ) ); 300 301 writer.println(); 302 303 for( PlanPhase phase : phaseDurations.keySet() ) 304 { 305 long phaseDuration = phaseDurations.get( phase ); 306 307 writer.format( "%s\t%.03f\n", phase, ( phaseDuration / 1000f ) ); 308 309 Map<String, Long> rules = ruleDurations.get( phase ); 310 311 writer.println( "=======================" ); 312 313 if( rules != null ) 314 { 315 for( String ruleName : rules.keySet() ) 316 { 317 long ruleDuration = rules.get( ruleName ); 318 writer.format( "%s\t%.03f\n", ruleName, ( ruleDuration / 1000f ) ); 319 } 320 } 321 322 writer.println( "" ); 323 } 324 } 325 }