001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.process; 023 024import java.io.FileWriter; 025import java.io.IOException; 026import java.io.Writer; 027import java.util.ArrayList; 028import java.util.Collection; 029import java.util.Comparator; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.PriorityQueue; 036import java.util.Set; 037 038import cascading.flow.FlowElement; 039import cascading.flow.planner.Scope; 040import cascading.flow.planner.graph.AnnotatedGraph; 041import cascading.flow.planner.graph.ElementGraph; 042import cascading.flow.planner.graph.ElementGraphs; 043import cascading.flow.planner.graph.Extent; 044import cascading.pipe.Group; 045import cascading.tap.Tap; 046import cascading.util.EnumMultiMap; 047import cascading.util.Util; 048import cascading.util.jgrapht.IntegerNameProvider; 049import cascading.util.jgrapht.VertexNameProvider; 050import org.jgrapht.graph.SimpleDirectedGraph; 051import org.jgrapht.traverse.TopologicalOrderIterator; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import static cascading.util.Util.createIdentitySet; 056 057/** 058 * 059 */ 060public abstract class BaseProcessGraph<Process extends ProcessModel> implements ProcessGraph<Process> 061 { 062 /** Field LOG */ 063 private static final Logger LOG = LoggerFactory.getLogger( BaseProcessGraph.class ); 064 065 final SimpleDirectedGraph<Process, ProcessEdge> graph; 066 067 protected Set<FlowElement> sourceElements = createIdentitySet(); 068 protected Set<FlowElement> sinkElements = createIdentitySet(); 069 private Set<Tap> sourceTaps; 070 private Set<Tap> sinkTaps; 071 protected Map<String, Tap> trapsMap = new HashMap<>(); 072 073 public BaseProcessGraph() 074 { 075 graph = new SimpleDirectedGraph( ProcessEdge.class ); 076 } 077 078 @Override 079 public boolean addVertex( Process process ) 080 { 081 sourceElements.addAll( process.getSourceElements() ); 082 sinkElements.addAll( process.getSinkElements() ); 083 trapsMap.putAll( process.getTrapMap() ); 084 085 return graph.addVertex( process ); 086 } 087 088 public void bindEdges() 089 { 090 for( Process sinkProcess : vertexSet() ) 091 { 092 for( Process sourceProcess : vertexSet() ) 093 { 094 if( sourceProcess == sinkProcess ) 095 continue; 096 097 // outer edge sources and sinks to this graph 098 sourceElements.removeAll( sinkProcess.getSinkElements() ); 099 sinkElements.removeAll( sourceProcess.getSourceElements() ); 100 } 101 } 102 103 for( Process sinkProcess : vertexSet() ) 104 { 105 for( Process sourceProcess : vertexSet() ) 106 { 107 if( sourceProcess == sinkProcess ) 108 continue; 109 110 for( FlowElement intermediate : sourceProcess.getSinkElements() ) 111 { 112 if( sinkProcess.getSourceElements().contains( intermediate ) ) 113 addEdge( sourceProcess, sinkProcess, new ProcessEdge<>( sourceProcess, intermediate, sinkProcess ) ); 114 } 115 } 116 } 117 118 // clear tap map caches 119 sourceTaps = null; 120 sinkTaps = null; 121 } 122 123 @Override 124 public Set<FlowElement> getSourceElements() 125 { 126 return sourceElements; 127 } 128 129 @Override 130 public Set<FlowElement> getSinkElements() 131 { 132 return sinkElements; 133 } 134 135 @Override 136 public Set<Tap> getSourceTaps() 137 { 138 if( sourceTaps != null ) 139 return sourceTaps; 140 141 sourceTaps = Util.narrowIdentitySet( Tap.class, getSourceElements() ); 142 143 return sourceTaps; 144 } 145 146 @Override 147 public Map<String, Tap> getSourceTapsMap() 148 { 149 Map<String, Tap> result = new HashMap<>(); 150 Set<Tap> sourceTaps = getSourceTaps(); 151 152 for( Tap sourceTap : sourceTaps ) 153 { 154 for( Process process : graph.vertexSet() ) 155 { 156 if( !process.getSourceTaps().contains( sourceTap ) ) 157 continue; 158 159 ElementGraph elementGraph = process.getElementGraph(); 160 161 for( Scope scope : elementGraph.outgoingEdgesOf( sourceTap ) ) 162 result.put( scope.getName(), sourceTap ); 163 } 164 } 165 166 return result; 167 } 168 169 @Override 170 public Set<Tap> getSinkTaps() 171 { 172 if( sinkTaps != null ) 173 return sinkTaps; 174 175 sinkTaps = Util.narrowIdentitySet( Tap.class, getSinkElements() ); 176 177 return sinkTaps; 178 } 179 180 @Override 181 public Map<String, Tap> getSinkTapsMap() 182 { 183 Map<String, Tap> result = new HashMap<>(); 184 Set<Tap> sinkTaps = getSinkTaps(); 185 186 for( Tap sinkTap : sinkTaps ) 187 { 188 for( Process process : graph.vertexSet() ) 189 { 190 if( !process.getSinkTaps().contains( sinkTap ) ) 191 continue; 192 193 ElementGraph elementGraph = process.getElementGraph(); 194 195 for( Scope scope : elementGraph.incomingEdgesOf( sinkTap ) ) 196 result.put( scope.getName(), sinkTap ); 197 } 198 } 199 200 return result; 201 } 202 203 @Override 204 public Map<String, Tap> getTrapsMap() 205 { 206 return trapsMap; 207 } 208 209 @Override 210 public Iterator<Process> getTopologicalIterator() 211 { 212 return getOrderedTopologicalIterator( new Comparator<Process>() 213 { 214 @Override 215 public int compare( Process lhs, Process rhs ) 216 { 217 return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() ); 218 } 219 } ); 220 } 221 222 @Override 223 public Iterator<Process> getOrdinalTopologicalIterator() 224 { 225 return getOrderedTopologicalIterator( new Comparator<Process>() 226 { 227 @Override 228 public int compare( Process lhs, Process rhs ) 229 { 230 return Integer.valueOf( lhs.getOrdinal() ).compareTo( rhs.getOrdinal() ); 231 } 232 } ); 233 } 234 235 @Override 236 public Iterator<Process> getOrderedTopologicalIterator( Comparator<Process> comparator ) 237 { 238 return new TopologicalOrderIterator<>( graph, new PriorityQueue<>( 10, comparator ) ); 239 } 240 241 @Override 242 public Set<ElementGraph> getElementGraphs() 243 { 244 Set<ElementGraph> results = createIdentitySet(); 245 246 for( Process process : vertexSet() ) 247 results.add( process.getElementGraph() ); 248 249 return results; 250 } 251 252 @Override 253 public List<ElementGraph> getElementGraphs( FlowElement flowElement ) 254 { 255 List<Process> elementProcesses = getElementProcesses( flowElement ); 256 257 List<ElementGraph> elementGraphs = new ArrayList<>(); 258 259 for( Process elementProcess : elementProcesses ) 260 elementGraphs.add( elementProcess.getElementGraph() ); 261 262 return elementGraphs; 263 } 264 265 @Override 266 public List<Process> getElementProcesses( FlowElement flowElement ) 267 { 268 List<Process> processes = new ArrayList<>(); 269 270 for( Process process : vertexSet() ) 271 { 272 if( process.getElementGraph().vertexSet().contains( flowElement ) ) 273 processes.add( process ); 274 } 275 276 return processes; 277 } 278 279 @Override 280 public List<ElementGraph> getElementGraphs( Scope scope ) 281 { 282 List<Process> elementProcesses = getElementProcesses( scope ); 283 284 List<ElementGraph> elementGraphs = new ArrayList<>(); 285 286 for( Process elementProcess : elementProcesses ) 287 elementGraphs.add( elementProcess.getElementGraph() ); 288 289 return elementGraphs; 290 } 291 292 @Override 293 public List<Process> getElementProcesses( Scope scope ) 294 { 295 List<Process> processes = new ArrayList<>(); 296 297 for( Process process : vertexSet() ) 298 { 299 if( process.getElementGraph().edgeSet().contains( scope ) ) 300 processes.add( process ); 301 } 302 303 return processes; 304 } 305 306 @Override 307 public List<Process> getElementSourceProcesses( FlowElement flowElement ) 308 { 309 List<Process> sources = new ArrayList<>(); 310 311 for( Process process : vertexSet() ) 312 { 313 if( process.getSinkElements().contains( flowElement ) ) 314 sources.add( process ); 315 } 316 317 return sources; 318 } 319 320 @Override 321 public List<Process> getElementSinkProcesses( FlowElement flowElement ) 322 { 323 List<Process> sinks = new ArrayList<>(); 324 325 for( Process process : vertexSet() ) 326 { 327 if( process.getSourceElements().contains( flowElement ) ) 328 sinks.add( process ); 329 } 330 331 return sinks; 332 } 333 334 @Override 335 public Set<FlowElement> getAllSourceElements() 336 { 337 Set<FlowElement> results = createIdentitySet(); 338 339 for( Process process : vertexSet() ) 340 results.addAll( process.getSourceElements() ); 341 342 return results; 343 } 344 345 @Override 346 public Set<FlowElement> getAllSinkElements() 347 { 348 Set<FlowElement> results = createIdentitySet(); 349 350 for( Process process : vertexSet() ) 351 results.addAll( process.getSinkElements() ); 352 353 return results; 354 } 355 356 public EnumMultiMap<FlowElement> getAnnotations() 357 { 358 EnumMultiMap<FlowElement> annotations = new EnumMultiMap<>(); 359 360 for( Process process : vertexSet() ) 361 { 362 ElementGraph elementGraph = process.getElementGraph(); 363 364 if( elementGraph instanceof AnnotatedGraph ) 365 annotations.addAll( ( (AnnotatedGraph) elementGraph ).getAnnotations() ); 366 } 367 368 return annotations; 369 } 370 371 /** 372 * All elements, from the given ElementGraph, that belong to two or more processes, that are not sink or source elements that 373 * connect processes. 374 * 375 * @return Set 376 */ 377 @Override 378 public Set<FlowElement> getDuplicatedElements( ElementGraph elementGraph ) 379 { 380 Set<FlowElement> results = createIdentitySet(); 381 382 for( FlowElement flowElement : elementGraph.vertexSet() ) 383 { 384 if( getElementProcesses( flowElement ).size() > 1 ) 385 results.add( flowElement ); 386 } 387 388 results.remove( Extent.head ); 389 results.remove( Extent.tail ); 390 results.removeAll( getAllSourceElements() ); 391 results.removeAll( getAllSinkElements() ); 392 393 return results; 394 } 395 396 @Override 397 public Set<ElementGraph> getIdentityElementGraphs() 398 { 399 Set<ElementGraph> results = createIdentitySet(); 400 401 for( Process process : getIdentityProcesses() ) 402 results.add( process.getElementGraph() ); 403 404 return results; 405 } 406 407 /** 408 * Returns a set of processes that perform no internal operations. 409 * <p> 410 * for example if a FlowNode only has a Merge source and a GroupBy sink. 411 * 412 * @return 413 */ 414 @Override 415 public Set<Process> getIdentityProcesses() 416 { 417 Set<Process> results = new HashSet<>(); 418 419 for( Process process : vertexSet() ) 420 { 421 if( ProcessModels.isIdentity( process ) ) 422 results.add( process ); 423 } 424 425 return results; 426 } 427 428 /** 429 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 430 * 431 * @param filename of type String 432 */ 433 @Override 434 public void writeDOT( String filename ) 435 { 436 printProcessGraph( filename ); 437 } 438 439 protected void printProcessGraph( String filename ) 440 { 441 try 442 { 443 Writer writer = new FileWriter( filename ); 444 445 Util.writeDOT( writer, graph, new IntegerNameProvider<Process>(), new VertexNameProvider<Process>() 446 { 447 public String getVertexName( Process process ) 448 { 449 String name = "[" + process.getName() + "]"; 450 451 String sourceName = ""; 452 Set<Tap> sources = process.getSourceTaps(); 453 for( Tap source : sources ) 454 sourceName += "\\nsrc:[" + source.getIdentifier() + "]"; 455 456 if( sourceName.length() != 0 ) 457 name += sourceName; 458 459 Collection<Group> groups = process.getGroups(); 460 461 for( Group group : groups ) 462 { 463 String groupName = group.getName(); 464 465 if( groupName.length() != 0 ) 466 name += "\\ngrp:" + groupName; 467 } 468 469 Set<Tap> sinks = process.getSinkTaps(); 470 String sinkName = ""; 471 for( Tap sink : sinks ) 472 sinkName = "\\nsnk:[" + sink.getIdentifier() + "]"; 473 474 if( sinkName.length() != 0 ) 475 name += sinkName; 476 477 return name.replaceAll( "\"", "\'" ); 478 } 479 }, null ); 480 481 writer.close(); 482 } 483 catch( IOException exception ) 484 { 485 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 486 } 487 } 488 489 @Override 490 public void writeDOTNested( String filename, ElementGraph graph ) 491 { 492 ElementGraphs.printProcessGraph( filename, graph, this ); 493 } 494 495 public boolean containsEdge( Process sourceVertex, Process targetVertex ) 496 { 497 return graph.containsEdge( sourceVertex, targetVertex ); 498 } 499 500 public boolean removeAllEdges( Collection<? extends ProcessEdge> edges ) 501 { 502 return graph.removeAllEdges( edges ); 503 } 504 505 public Set<ProcessEdge> removeAllEdges( Process sourceVertex, Process targetVertex ) 506 { 507 return graph.removeAllEdges( sourceVertex, targetVertex ); 508 } 509 510 public boolean removeAllVertices( Collection<? extends Process> vertices ) 511 { 512 return graph.removeAllVertices( vertices ); 513 } 514 515 public Set<ProcessEdge> getAllEdges( Process sourceVertex, Process targetVertex ) 516 { 517 return graph.getAllEdges( sourceVertex, targetVertex ); 518 } 519 520 public ProcessEdge getEdge( Process sourceVertex, Process targetVertex ) 521 { 522 return graph.getEdge( sourceVertex, targetVertex ); 523 } 524 525 public ProcessEdge addEdge( Process sourceVertex, Process targetVertex ) 526 { 527 return graph.addEdge( sourceVertex, targetVertex ); 528 } 529 530 public boolean addEdge( Process sourceVertex, Process targetVertex, ProcessEdge processEdge ) 531 { 532 return graph.addEdge( sourceVertex, targetVertex, processEdge ); 533 } 534 535 public Process getEdgeSource( ProcessEdge processEdge ) 536 { 537 return graph.getEdgeSource( processEdge ); 538 } 539 540 public Process getEdgeTarget( ProcessEdge processEdge ) 541 { 542 return graph.getEdgeTarget( processEdge ); 543 } 544 545 public boolean containsEdge( ProcessEdge processEdge ) 546 { 547 return graph.containsEdge( processEdge ); 548 } 549 550 public boolean containsVertex( Process process ) 551 { 552 return graph.containsVertex( process ); 553 } 554 555 public Set<ProcessEdge> edgeSet() 556 { 557 return graph.edgeSet(); 558 } 559 560 public Set<ProcessEdge> edgesOf( Process vertex ) 561 { 562 return graph.edgesOf( vertex ); 563 } 564 565 public int inDegreeOf( Process vertex ) 566 { 567 return graph.inDegreeOf( vertex ); 568 } 569 570 public Set<ProcessEdge> incomingEdgesOf( Process vertex ) 571 { 572 return graph.incomingEdgesOf( vertex ); 573 } 574 575 public int outDegreeOf( Process vertex ) 576 { 577 return graph.outDegreeOf( vertex ); 578 } 579 580 public Set<ProcessEdge> outgoingEdgesOf( Process vertex ) 581 { 582 return graph.outgoingEdgesOf( vertex ); 583 } 584 585 public ProcessEdge removeEdge( Process sourceVertex, Process targetVertex ) 586 { 587 return graph.removeEdge( sourceVertex, targetVertex ); 588 } 589 590 public boolean removeEdge( ProcessEdge processEdge ) 591 { 592 return graph.removeEdge( processEdge ); 593 } 594 595 public boolean removeVertex( Process process ) 596 { 597 return graph.removeVertex( process ); 598 } 599 600 public Set<Process> vertexSet() 601 { 602 return graph.vertexSet(); 603 } 604 605 public double getEdgeWeight( ProcessEdge processEdge ) 606 { 607 return graph.getEdgeWeight( processEdge ); 608 } 609 }