001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.rule.util; 022 023import java.io.File; 024import java.io.IOException; 025import java.io.PrintWriter; 026import java.nio.file.FileSystems; 027import java.nio.file.Path; 028import java.util.Collections; 029import java.util.EnumMap; 030import java.util.Iterator; 031import java.util.LinkedHashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035 036import cascading.flow.Flow; 037import cascading.flow.FlowNode; 038import cascading.flow.FlowStep; 039import cascading.flow.Flows; 040import cascading.flow.planner.FlowPlanner; 041import cascading.flow.planner.PlannerContext; 042import cascading.flow.planner.graph.ElementGraph; 043import cascading.flow.planner.graph.FlowElementGraph; 044import cascading.flow.planner.iso.GraphResult; 045import cascading.flow.planner.iso.assertion.Asserted; 046import cascading.flow.planner.iso.subgraph.Partitions; 047import cascading.flow.planner.iso.transformer.Transformed; 048import cascading.flow.planner.process.FlowNodeGraph; 049import cascading.flow.planner.process.FlowStepGraph; 050import cascading.flow.planner.rule.PlanPhase; 051import cascading.flow.planner.rule.ProcessLevel; 052import cascading.flow.planner.rule.Rule; 053import cascading.flow.planner.rule.RuleResult; 054import cascading.property.AppProps; 055import cascading.util.ProcessLogger; 056import cascading.util.Util; 057import cascading.util.Version; 058 059import static cascading.flow.planner.graph.ElementGraphs.canonicalHash; 060import static cascading.property.PropertyUtil.getStringProperty; 061 062public class TraceWriter 063 { 064 public static final String GREEN = "0000000000000000000400000000000000000000000000000000000000000000"; 065 public static final String ORANGE = "0000000000000000000E00000000000000000000000000000000000000000000"; 066 public static final String RED = "0000000000000000000C00000000000000000000000000000000000000000000"; 067 private String flowName; 068 private Map properties = Collections.emptyMap(); 069 private ProcessLogger processLogger = ProcessLogger.NULL; 070 071 Map<ProcessLevel, Set<Rule>> counts = new EnumMap<>( ProcessLevel.class ); 072 073 public TraceWriter() 074 { 075 } 076 077 public TraceWriter( Flow flow ) 078 { 079 if( flow == null ) 080 return; 081 082 this.properties = flow.getConfigAsProperties(); 083 this.flowName = Flows.getNameOrID( flow ); 084 this.processLogger = (ProcessLogger) flow; 085 } 086 087 protected Path getFullTransformTracePath( String registryName ) 088 { 089 Path planTransformTracePath = getPlanTransformTracePath(); 090 091 if( planTransformTracePath == null ) 092 return null; 093 094 return planTransformTracePath.resolve( registryName ); 095 } 096 097 public boolean isTransformTraceEnabled() 098 { 099 return !isTransformTraceDisabled(); 100 } 101 102 public boolean isTransformTraceDisabled() 103 { 104 return getPlanTransformTracePath() == null; 105 } 106 107 public void writeTransformPlan( String registryName, PlanPhase phase, Rule rule, int[] ordinals, GraphResult graphResult ) 108 { 109 if( isTransformTraceDisabled() ) 110 return; 111 112 String ruleName = String.format( "%02d-%s-%04d", phase.ordinal(), phase, addRule( rule ) ); 113 114 for( int i = 1; i < ordinals.length; i++ ) 115 ruleName = String.format( "%s-%04d", ruleName, ordinals[ i ] ); 116 117 ruleName = String.format( "%s-%s", ruleName, graphResult.getRuleName() ); 118 119 Path path = getFullTransformTracePath( registryName ).resolve( ruleName ); 120 121 graphResult.writeDOTs( path.toString() ); 122 123 markResult( graphResult, path ); 124 } 125 126 public void writeTransformPlan( String registryName, FlowElementGraph flowElementGraph, String name ) 127 { 128 if( isTransformTraceDisabled() ) 129 return; 130 131 if( flowElementGraph == null ) 132 { 133 processLogger.logInfo( "cannot write phase assembly trace, flowElementGraph is null" ); 134 return; 135 } 136 137 Path file = getFullTransformTracePath( registryName ).resolve( name ).normalize(); 138 139 processLogger.logInfo( "writing phase assembly trace: {}, to: {}", name, file ); 140 141 flowElementGraph.writeDOT( file.toString() ); 142 } 143 144 public void writeTransformPlan( String registryName, List<? extends ElementGraph> flowElementGraphs, PlanPhase phase, String subName ) 145 { 146 if( isTransformTraceDisabled() ) 147 return; 148 149 if( flowElementGraphs == null || flowElementGraphs.isEmpty() ) 150 { 151 processLogger.logInfo( "cannot write phase step trace, flowElementGraphs is empty" ); 152 return; 153 } 154 155 for( int i = 0; i < flowElementGraphs.size(); i++ ) 156 { 157 ElementGraph flowElementGraph = flowElementGraphs.get( i ); 158 String name = String.format( "%02d-%s-%s-%04d.dot", phase.ordinal(), phase, subName, i ); 159 160 Path file = getFullTransformTracePath( registryName ).resolve( name ).normalize(); 161 162 processLogger.logInfo( "writing phase step trace: {}, to: {}", name, file ); 163 164 flowElementGraph.writeDOT( file.toString() ); 165 } 166 } 167 168 public void writeTransformPlan( String registryName, Map<ElementGraph, List<? extends ElementGraph>> parentGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> subGraphsMap, PlanPhase phase, String subName ) 169 { 170 if( isTransformTraceDisabled() ) 171 return; 172 173 if( parentGraphsMap == null || parentGraphsMap.isEmpty() ) 174 { 175 processLogger.logInfo( "cannot write phase node pipeline trace, parentGraphsMap is empty" ); 176 return; 177 } 178 179 int stepCount = 0; 180 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : parentGraphsMap.entrySet() ) 181 { 182 int nodeCount = 0; 183 for( ElementGraph elementGraph : entry.getValue() ) 184 { 185 List<? extends ElementGraph> pipelineGraphs = subGraphsMap.get( elementGraph ); 186 187 if( pipelineGraphs == null ) 188 continue; 189 190 for( int i = 0; i < pipelineGraphs.size(); i++ ) 191 { 192 ElementGraph flowElementGraph = pipelineGraphs.get( i ); 193 String name = String.format( "%02d-%s-%s-%04d-%04d-%04d.dot", phase.ordinal(), phase, subName, stepCount, nodeCount, i ); 194 195 Path file = getFullTransformTracePath( registryName ).resolve( name ); 196 197 processLogger.logInfo( "writing phase node pipeline trace: {}, to: {}", name, file ); 198 199 flowElementGraph.writeDOT( file.toString() ); 200 } 201 202 nodeCount++; 203 } 204 205 stepCount++; 206 } 207 } 208 209 public void writeTransformPlan( String registryName, Map<ElementGraph, List<? extends ElementGraph>> subGraphsMap, PlanPhase phase, String subName ) 210 { 211 if( isTransformTraceDisabled() ) 212 return; 213 214 if( subGraphsMap == null || subGraphsMap.isEmpty() ) 215 { 216 processLogger.logInfo( "cannot write phase node trace, subGraphs is empty" ); 217 return; 218 } 219 220 int stepCount = 0; 221 for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : subGraphsMap.entrySet() ) 222 { 223 List<? extends ElementGraph> flowElementGraphs = entry.getValue(); 224 225 for( int i = 0; i < flowElementGraphs.size(); i++ ) 226 { 227 ElementGraph flowElementGraph = flowElementGraphs.get( i ); 228 String name = String.format( "%02d-%s-%s-%04d-%04d.dot", phase.ordinal(), phase, subName, stepCount, i ); 229 230 Path file = getFullTransformTracePath( registryName ).resolve( name ); 231 232 processLogger.logInfo( "writing phase node trace: {}, to: {}", name, file ); 233 234 flowElementGraph.writeDOT( file.toString() ); 235 } 236 237 stepCount++; 238 } 239 } 240 241 protected Path getPlanTracePath() 242 { 243 return applyScope( getStringProperty( System.getProperties(), properties, FlowPlanner.TRACE_PLAN_PATH ) ); 244 } 245 246 protected Path getPlanTransformTracePath() 247 { 248 return applyScope( getStringProperty( System.getProperties(), properties, FlowPlanner.TRACE_PLAN_TRANSFORM_PATH ) ); 249 } 250 251 protected Path getPlanStatsPath() 252 { 253 return applyScope( getStringProperty( System.getProperties(), properties, FlowPlanner.TRACE_STATS_PATH ) ); 254 } 255 256 private Path applyScope( String path ) 257 { 258 if( path == null ) 259 return null; 260 261 return FileSystems.getDefault().getPath( path, flowName ); 262 } 263 264 public void writeTracePlan( String registryName, String fileName, ElementGraph elementGraph ) 265 { 266 Path path = getPlanTracePath(); 267 268 if( path == null ) 269 return; 270 271 if( elementGraph == null ) 272 { 273 processLogger.logInfo( "cannot write trace element plan, elementGraph is null" ); 274 return; 275 } 276 277 if( registryName != null ) 278 path = path.resolve( registryName ); 279 280 Path filePath = path.resolve( String.format( "%s-%s.dot", fileName, canonicalHash( elementGraph ) ) ); 281 File file = filePath.toFile(); 282 283 processLogger.logInfo( "writing trace element plan: {}", file ); 284 285 String filename = file.toString(); 286 287 elementGraph.writeDOT( filename ); 288 } 289 290 public void writeTracePlan( String registryName, String fileName, FlowStepGraph stepGraph ) 291 { 292 Path path = getPlanTracePath(); 293 294 if( path == null ) 295 return; 296 297 if( stepGraph == null ) 298 { 299 processLogger.logInfo( "cannot write step plan, stepGraph is null" ); 300 return; 301 } 302 303 if( registryName != null ) 304 path = path.resolve( registryName ); 305 306 Path filePath = path.resolve( String.format( "%s.dot", fileName ) ); 307 File file = filePath.toFile(); 308 309 processLogger.logInfo( "writing trace step plan: {}", file ); 310 311 stepGraph.writeDOT( file.toString() ); 312 } 313 314 public void writeTracePlanSteps( String directoryName, FlowStepGraph stepGraph ) 315 { 316 if( stepGraph == null ) 317 { 318 processLogger.logInfo( "cannot write trace step plan, stepGraph is null" ); 319 return; 320 } 321 322 Iterator<FlowStep> iterator = stepGraph.getTopologicalIterator(); 323 324 while( iterator.hasNext() ) 325 writePlan( iterator.next(), directoryName ); 326 } 327 328 private void writePlan( FlowStep flowStep, String directoryName ) 329 { 330 Path path = getPlanTracePath(); 331 332 if( path == null ) 333 return; 334 335 int stepOrdinal = flowStep.getOrdinal(); 336 Path rootPath = path.resolve( directoryName ); 337 ElementGraph stepSubGraph = flowStep.getElementGraph(); 338 String stepGraphName = String.format( "%s/%04d-step-sub-graph-%s.dot", rootPath, stepOrdinal, canonicalHash( stepSubGraph ) ); 339 340 stepSubGraph.writeDOT( stepGraphName ); 341 342 FlowNodeGraph flowNodeGraph = flowStep.getFlowNodeGraph(); 343 344 String stepNodeElementGraphName = String.format( "%s/%04d-step-node-sub-graph.dot", rootPath, stepOrdinal ); 345 346 flowNodeGraph.writeDOTNested( stepNodeElementGraphName, stepSubGraph ); 347 348 String stepNodeGraphName = String.format( "%s/%04d-step-node-graph.dot", rootPath, stepOrdinal ); 349 350 flowNodeGraph.writeDOT( stepNodeGraphName ); 351 352 Iterator<FlowNode> iterator = flowNodeGraph.getOrderedTopologicalIterator(); 353 354 while( iterator.hasNext() ) 355 { 356 FlowNode flowNode = iterator.next(); 357 ElementGraph nodeGraph = flowNode.getElementGraph(); 358 int nodeOrdinal = flowNode.getOrdinal(); 359 String nodeGraphName = String.format( "%s/%04d-%04d-step-node-sub-graph-%s.dot", rootPath, stepOrdinal, nodeOrdinal, canonicalHash( nodeGraph ) ); 360 361 nodeGraph.writeDOT( nodeGraphName ); 362 363 List<? extends ElementGraph> pipelineGraphs = flowNode.getPipelineGraphs(); 364 365 for( int j = 0; j < pipelineGraphs.size(); j++ ) 366 { 367 ElementGraph pipelineGraph = pipelineGraphs.get( j ); 368 369 String pipelineGraphName = String.format( "%s/%04d-%04d-%04d-step-node-pipeline-sub-graph.dot", rootPath, stepOrdinal, nodeOrdinal, j ); 370 371 pipelineGraph.writeDOT( pipelineGraphName ); 372 } 373 } 374 } 375 376 public void writeFinal( String fileName, RuleResult ruleResult ) 377 { 378 Path path = getPlanTracePath(); 379 380 if( path == null ) 381 return; 382 383 Path filePath = path.resolve( String.format( "%s-%s.txt", fileName, ruleResult.getRegistry().getName() ) ); 384 File file = filePath.toFile(); 385 386 processLogger.logInfo( "writing final registry: {}", file ); 387 388 try (PrintWriter writer = new PrintWriter( file )) 389 { 390 writer.println( "filename names winning rule registry" ); 391 } 392 catch( IOException exception ) 393 { 394 processLogger.logError( "could not write final registry", exception ); 395 } 396 } 397 398 public void writeStats( PlannerContext plannerContext, RuleResult ruleResult ) 399 { 400 Path path = getPlanStatsPath(); 401 402 if( path == null ) 403 return; 404 405 File file = path.resolve( String.format( "planner-stats-%s-%s.txt", ruleResult.getRegistry().getName(), ruleResult.getResultStatus() ) ).toFile(); 406 407 processLogger.logInfo( "writing planner stats to: {}", file ); 408 409 file.getParentFile().mkdirs(); 410 411 try (PrintWriter writer = new PrintWriter( file )) 412 { 413 Flow flow = plannerContext.getFlow(); 414 415 Map<Object, Object> configAsProperties = flow.getConfigAsProperties(); 416 417 writer.format( "cascading version: %s, build: %s\n", emptyOrValue( Version.getReleaseFull() ), emptyOrValue( Version.getReleaseBuild() ) ); 418 writer.format( "application id: %s\n", emptyOrValue( AppProps.getApplicationID( configAsProperties ) ) ); 419 writer.format( "application name: %s\n", emptyOrValue( AppProps.getApplicationName( configAsProperties ) ) ); 420 writer.format( "application version: %s\n", emptyOrValue( AppProps.getApplicationVersion( configAsProperties ) ) ); 421 writer.format( "platform: %s\n", emptyOrValue( flow.getPlatformInfo() ) ); 422 writer.format( "frameworks: %s\n", emptyOrValue( AppProps.getApplicationFrameworks( configAsProperties ) ) ); 423 424 writer.println(); 425 426 ruleResult.writeStats( writer ); 427 } 428 catch( IOException exception ) 429 { 430 processLogger.logError( "could not write stats", exception ); 431 } 432 } 433 434 private static String emptyOrValue( Object value ) 435 { 436 if( value == null ) 437 return ""; 438 439 if( Util.isEmpty( value.toString() ) ) 440 return ""; 441 442 return value.toString(); 443 } 444 445 private void markResult( GraphResult graphResult, Path path ) 446 { 447 if( graphResult instanceof Transformed ) 448 markTransformed( (Transformed) graphResult, path ); 449 else if( graphResult instanceof Asserted ) 450 markAsserted( (Asserted) graphResult, path ); 451 else if( graphResult instanceof Partitions ) 452 markPartitioned( (Partitions) graphResult, path ); 453 } 454 455 private void markPartitioned( Partitions partition, Path path ) 456 { 457 String color = null; 458 459 if( partition.hasContractedMatches() ) 460 color = ORANGE; 461 462 if( partition.hasSubGraphs() ) 463 color = GREEN; 464 465 markFolder( path, color ); 466 } 467 468 private void markAsserted( Asserted asserted, Path path ) 469 { 470 if( asserted.getFirstAnchor() != null ) 471 markFolder( path, RED ); 472 } 473 474 private void markTransformed( Transformed transformed, Path path ) 475 { 476 if( transformed.getEndGraph() != null && !transformed.getBeginGraph().equals( transformed.getEndGraph() ) ) 477 markFolder( path, GREEN ); 478 } 479 480 private void markFolder( Path path, String color ) 481 { 482 if( !Util.IS_OSX ) 483 return; 484 485 if( color == null ) 486 return; 487 488 // xattr -wx com.apple.FinderInfo 0000000000000000000400000000000000000000000000000000000000000000 child-0-ContractedGraphTransformer 489 490 File file = path.toFile(); 491 492 File parentFile = file.getParentFile(); 493 String name = file.getName(); 494 495 String[] command = { 496 "xattr", 497 "-wx", 498 "com.apple.FinderInfo", 499 color, 500 name 501 }; 502 503 Util.execProcess( parentFile, command ); 504 } 505 506 int addRule( Rule rule ) 507 { 508 return addRule( rule.getRulePhase().getLevel(), rule ); 509 } 510 511 int addRule( ProcessLevel level, Rule rule ) 512 { 513 if( !counts.containsKey( level ) ) 514 counts.put( level, new LinkedHashSet<Rule>() ); 515 516 Set<Rule> rules = counts.get( level ); 517 518 rules.add( rule ); 519 520 return rules.size() - 1; 521 } 522 }