001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.rule; 022 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.Comparator; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Set; 029import java.util.concurrent.Callable; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.ExecutorCompletionService; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.concurrent.TimeUnit; 036 037import cascading.flow.Flow; 038import cascading.flow.FlowDef; 039import cascading.flow.planner.FlowPlanner; 040import cascading.flow.planner.PlannerContext; 041import cascading.flow.planner.PlannerException; 042import cascading.flow.planner.graph.FlowElementGraph; 043import cascading.flow.planner.rule.util.TraceWriter; 044import cascading.util.ProcessLogger; 045import cascading.util.Util; 046 047import static cascading.util.Util.formatDurationFromMillis; 048import static java.util.Collections.synchronizedList; 049import static java.util.Collections.synchronizedSet; 050 051public class RuleSetExec 052 { 053 public static final int MAX_CONCURRENT_PLANNERS = 5; 054 public static final int DEFAULT_TIMEOUT = 10 * 60; 055 public static final Comparator<RuleResult> DEFAULT_PLAN_COMPARATOR = new Comparator<RuleResult>() 056 { 057 @Override 058 public int compare( RuleResult lhs, RuleResult rhs ) 059 { 060 int c = lhs.getNumSteps() - rhs.getNumSteps(); 061 062 if( c != 0 ) 063 return c; 064 065 return lhs.getNumNodes() - rhs.getNumNodes(); 066 } 067 068 @Override 069 public String toString() 070 { 071 return "default comparator: selects plan with fewest steps and fewest nodes"; 072 } 073 }; 074 075 private TraceWriter traceWriter; 076 private FlowPlanner flowPlanner; 077 private Flow flow; 078 private RuleRegistrySet registrySet; 079 private FlowDef flowDef; 080 private FlowElementGraph flowElementGraph; 081 082 Set<Callable> running; 083 List<RuleResult> success; 084 List<RuleResult> unsupported; 085 List<RuleResult> illegal; 086 List<RuleResult> interrupted; 087 088 public RuleSetExec( TraceWriter traceWriter, FlowPlanner flowPlanner, Flow flow, RuleRegistrySet registrySet, FlowDef flowDef, FlowElementGraph flowElementGraph ) 089 { 090 this.traceWriter = traceWriter; 091 this.flowPlanner = flowPlanner; 092 this.flow = flow; 093 this.registrySet = registrySet; 094 this.flowDef = flowDef; 095 this.flowElementGraph = flowElementGraph; 096 } 097 098 protected ProcessLogger getFlowLogger() 099 { 100 return (ProcessLogger) flow; 101 } 102 103 protected Comparator<RuleResult> getPlanComparator() 104 { 105 if( registrySet.getPlanComparator() != null ) 106 return registrySet.getPlanComparator(); 107 108 return DEFAULT_PLAN_COMPARATOR; 109 } 110 111 protected Comparator<RuleResult> getOrderComparator() 112 { 113 return new Comparator<RuleResult>() 114 { 115 @Override 116 public int compare( RuleResult lhs, RuleResult rhs ) 117 { 118 // preserver order of preference from rule registry if all things are equal 119 return registrySet.indexOf( lhs.getRegistry() ) - registrySet.indexOf( rhs.getRegistry() ); 120 } 121 }; 122 } 123 124 public RuleResult exec() 125 { 126 running = synchronizedSet( new HashSet<Callable>() ); 127 success = synchronizedList( new ArrayList<RuleResult>() ); 128 unsupported = synchronizedList( new ArrayList<RuleResult>() ); 129 illegal = synchronizedList( new ArrayList<RuleResult>() ); 130 interrupted = synchronizedList( new ArrayList<RuleResult>() ); 131 132 List<Callable<RuleResult>> callables = createCallables(); 133 134 submitCallables( callables ); 135 136 notifyUnsupported(); 137 notifyIllegal(); 138 notifyInterrupted(); 139 140 return selectSuccess(); 141 } 142 143 protected RuleResult execPlannerFor( RuleRegistry ruleRegistry ) 144 { 145 flowPlanner.configRuleRegistryDefaults( ruleRegistry ); 146 147 String registryName = ruleRegistry.getName(); 148 149 RuleExec ruleExec = new RuleExec( traceWriter, ruleRegistry ); 150 151 PlannerContext plannerContext = new PlannerContext( ruleRegistry, flowPlanner, flowDef, flow, traceWriter.isTransformTraceEnabled() ); 152 153 RuleResult ruleResult = ruleExec.exec( plannerContext, flowElementGraph ); 154 155 getFlowLogger().logInfo( "executed rule registry: {}, completed as: {}, in: {}", registryName, ruleResult.getResultStatus(), formatDurationFromMillis( ruleResult.getDuration() ) ); 156 157 traceWriter.writeTracePlan( registryName, "completed-flow-element-graph", ruleResult.getAssemblyGraph() ); 158 traceWriter.writeStats( plannerContext, ruleResult ); 159 160 Exception plannerException; 161 162 if( ruleResult.isSuccess() ) 163 plannerException = flowPlanner.verifyResult( ruleResult ); 164 else 165 plannerException = ruleResult.getPlannerException(); // will be re-thrown below 166 167 if( plannerException != null && plannerException instanceof PlannerException && ( (PlannerException) plannerException ).getElementGraph() != null ) 168 traceWriter.writeTracePlan( registryName, "failed-source-element-graph", ( (PlannerException) plannerException ).getElementGraph() ); 169 170 if( ruleResult.isSuccess() && plannerException != null ) 171 rethrow( plannerException ); 172 173 return ruleResult; 174 } 175 176 protected Set<Future<RuleResult>> submitCallables( List<Callable<RuleResult>> callables ) 177 { 178 int size = Math.min( MAX_CONCURRENT_PLANNERS, callables.size() ); 179 180 ExecutorService executor = Executors.newFixedThreadPool( size ); 181 ExecutorCompletionService<RuleResult> completionService = new ExecutorCompletionService<>( executor ); 182 Set<Future<RuleResult>> futures = new HashSet<>(); 183 184 RuleRegistrySet.Select select = registrySet.getSelect(); 185 long totalDuration = registrySet.getPlannerTimeoutSec(); 186 long startAll = TimeUnit.MILLISECONDS.toSeconds( System.currentTimeMillis() ); 187 188 for( Callable<RuleResult> callable : callables ) 189 futures.add( completionService.submit( callable ) ); 190 191 executor.shutdown(); 192 193 try 194 { 195 boolean timedOut = false; 196 197 while( !futures.isEmpty() ) 198 { 199 Future<RuleResult> future = completionService.poll( totalDuration, TimeUnit.SECONDS ); 200 201 long currentDuration = TimeUnit.MILLISECONDS.toSeconds( System.currentTimeMillis() ) - startAll; 202 203 totalDuration -= currentDuration; 204 205 timedOut = future == null; 206 207 if( timedOut ) 208 break; 209 210 futures.remove( future ); 211 212 boolean success = binResult( future.get() ); 213 214 if( success && select == RuleRegistrySet.Select.FIRST ) 215 break; 216 } 217 218 if( !futures.isEmpty() ) 219 { 220 if( timedOut ) 221 getFlowLogger().logWarn( "planner cancelling long running registries past timeout period: {}, see RuleRegistrySet#setPlannerTimeoutSec() to change timeout", formatDurationFromMillis( registrySet.getPlannerTimeoutSec() * 1000 ) ); 222 else 223 getFlowLogger().logInfo( "first registry completed, planner cancelling remaining running registries: {}, successful: {}", futures.size(), success.size() ); 224 225 for( Future<RuleResult> current : futures ) 226 current.cancel( true ); 227 228 int timeout = 0; 229 230 while( !running.isEmpty() && timeout < 60 ) 231 { 232 Util.safeSleep( 500 ); 233 timeout++; 234 } 235 } 236 } 237 catch( InterruptedException exception ) 238 { 239 getFlowLogger().logError( "planner thread interrupted", exception ); 240 241 rethrow( exception ); 242 } 243 catch( ExecutionException exception ) 244 { 245 rethrow( exception.getCause() ); 246 } 247 248 return futures; 249 } 250 251 protected List<Callable<RuleResult>> createCallables() 252 { 253 List<Callable<RuleResult>> callables = new ArrayList<>(); 254 255 for( RuleRegistry ruleRegistry : registrySet.ruleRegistries ) 256 callables.add( createCallable( ruleRegistry ) ); 257 258 return callables; 259 } 260 261 private RuleResult selectSuccess() 262 { 263 if( success.isEmpty() ) 264 throw new IllegalStateException( "no planner results from registry set" ); 265 266 for( RuleResult ruleResult : success ) 267 getFlowLogger().logInfo( "rule registry: {}, supports assembly with steps: {}, nodes: {}", ruleResult.getRegistry().getName(), ruleResult.getNumSteps(), ruleResult.getNumNodes() ); 268 269 if( success.size() != 1 ) 270 { 271 // sort is stable 272 Collections.sort( success, getOrderComparator() ); 273 Collections.sort( success, getPlanComparator() ); 274 } 275 276 RuleResult ruleResult = success.get( 0 ); 277 278 if( registrySet.getSelect() == RuleRegistrySet.Select.FIRST ) 279 getFlowLogger().logInfo( "rule registry: {}, result was selected as first successful", ruleResult.getRegistry().getName() ); 280 else if( registrySet.getSelect() == RuleRegistrySet.Select.COMPARED ) 281 getFlowLogger().logInfo( "rule registry: {}, result was selected using: \'{}\'", ruleResult.getRegistry().getName(), getPlanComparator().toString() ); 282 283 return ruleResult; 284 } 285 286 private void notifyUnsupported() 287 { 288 if( unsupported.isEmpty() ) 289 return; 290 291 for( RuleResult ruleResult : unsupported ) 292 getFlowLogger().logInfo( "rule registry: {}, does not support assembly", ruleResult.getRegistry().getName() ); 293 294 if( !registrySet.isIgnoreFailed() || success.isEmpty() && illegal.isEmpty() && interrupted.isEmpty() ) 295 rethrow( unsupported.get( 0 ).getPlannerException() ); 296 } 297 298 private void notifyIllegal() 299 { 300 if( illegal.isEmpty() ) 301 return; 302 303 for( RuleResult ruleResult : illegal ) 304 getFlowLogger().logInfo( "rule registry: {}, found assembly to be malformed", ruleResult.getRegistry().getName() ); 305 306 if( !registrySet.isIgnoreFailed() || success.isEmpty() ) 307 rethrow( illegal.get( 0 ).getPlannerException() ); 308 } 309 310 private void notifyInterrupted() 311 { 312 if( interrupted.isEmpty() ) 313 return; 314 315 for( RuleResult ruleResult : interrupted ) 316 getFlowLogger().logInfo( "rule registry: {}, planned longer than default duration, was cancelled", ruleResult.getRegistry().getName() ); 317 318 if( interrupted.size() == registrySet.size() ) 319 throw new PlannerException( "planner registry timeout exceeded for all registries: " + formatDurationFromMillis( registrySet.getPlannerTimeoutSec() * 1000 ) ); 320 321 if( !registrySet.isIgnoreFailed() || success.isEmpty() ) 322 rethrow( interrupted.get( 0 ).getPlannerException() ); 323 } 324 325 protected Callable<RuleResult> createCallable( final RuleRegistry ruleRegistry ) 326 { 327 return new Callable<RuleResult>() 328 { 329 @Override 330 public RuleResult call() throws Exception 331 { 332 running.add( this ); 333 334 try 335 { 336 return execPlannerFor( ruleRegistry ); 337 } 338 finally 339 { 340 running.remove( this ); 341 } 342 } 343 }; 344 } 345 346 protected boolean binResult( RuleResult ruleResult ) 347 { 348 switch( ruleResult.getResultStatus() ) 349 { 350 case SUCCESS: 351 success.add( ruleResult ); 352 return true; 353 354 case UNSUPPORTED: 355 unsupported.add( ruleResult ); 356 break; 357 358 case ILLEGAL: 359 illegal.add( ruleResult ); 360 break; 361 362 case INTERRUPTED: 363 interrupted.add( ruleResult ); 364 break; 365 } 366 367 return false; 368 } 369 370 private void rethrow( Throwable throwable ) 371 { 372 if( throwable instanceof Error ) 373 throw (Error) throwable; 374 375 if( throwable instanceof RuntimeException ) 376 throw (RuntimeException) throwable; 377 378 throw new PlannerException( throwable ); 379 } 380 }