001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.io.FileWriter; 024import java.io.IOException; 025import java.io.Writer; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Comparator; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.PriorityQueue; 035import java.util.Set; 036 037import cascading.flow.FlowElement; 038import cascading.flow.planner.Scope; 039import cascading.flow.planner.graph.AnnotatedGraph; 040import cascading.flow.planner.graph.ElementGraph; 041import cascading.flow.planner.graph.ElementGraphs; 042import cascading.flow.planner.graph.Extent; 043import cascading.pipe.Group; 044import cascading.tap.Tap; 045import cascading.util.EnumMultiMap; 046import cascading.util.Util; 047import cascading.util.jgrapht.IntegerNameProvider; 048import cascading.util.jgrapht.VertexNameProvider; 049import org.jgrapht.graph.SimpleDirectedGraph; 050import org.jgrapht.traverse.TopologicalOrderIterator; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import static cascading.util.Util.createIdentitySet; 055 056/** 057 * 058 */ 059public abstract class BaseProcessGraph<Process extends ProcessModel> implements ProcessGraph<Process> 060 { 061 /** Field LOG */ 062 private static final Logger LOG = LoggerFactory.getLogger( BaseProcessGraph.class ); 063 064 final SimpleDirectedGraph<Process, ProcessEdge> graph; 065 066 protected Set<FlowElement> sourceElements = createIdentitySet(); 067 protected Set<FlowElement> sinkElements = createIdentitySet(); 068 private Set<Tap> sourceTaps; 069 private Set<Tap> sinkTaps; 070 protected Map<String, Tap> trapsMap = new HashMap<>(); 071 072 public BaseProcessGraph() 073 { 074 graph = new SimpleDirectedGraph( ProcessEdge.class ); 075 } 076 077 @Override 078 public boolean addVertex( Process process ) 079 { 080 sourceElements.addAll( process.getSourceElements() ); 081 sinkElements.addAll( process.getSinkElements() ); 082 trapsMap.putAll( process.getTrapMap() ); 083 084 return graph.addVertex( process ); 085 } 086 087 public void bindEdges() 088 { 089 for( Process sinkProcess : vertexSet() ) 090 { 091 for( Process sourceProcess : vertexSet() ) 092 { 093 if( sourceProcess == sinkProcess ) 094 continue; 095 096 // outer edge sources and sinks to this graph 097 sourceElements.removeAll( sinkProcess.getSinkElements() ); 098 sinkElements.removeAll( sourceProcess.getSourceElements() ); 099 } 100 } 101 102 for( Process sinkProcess : vertexSet() ) 103 { 104 for( Process sourceProcess : vertexSet() ) 105 { 106 if( sourceProcess == sinkProcess ) 107 continue; 108 109 for( FlowElement intermediate : sourceProcess.getSinkElements() ) 110 { 111 if( sinkProcess.getSourceElements().contains( intermediate ) ) 112 addEdge( sourceProcess, sinkProcess, new ProcessEdge<>( sourceProcess, intermediate, sinkProcess ) ); 113 } 114 } 115 } 116 117 // clear tap map caches 118 sourceTaps = null; 119 sinkTaps = null; 120 } 121 122 @Override 123 public Set<FlowElement> getSourceElements() 124 { 125 return sourceElements; 126 } 127 128 @Override 129 public Set<FlowElement> getSinkElements() 130 { 131 return sinkElements; 132 } 133 134 @Override 135 public Set<Tap> getSourceTaps() 136 { 137 if( sourceTaps != null ) 138 return sourceTaps; 139 140 sourceTaps = Util.narrowIdentitySet( Tap.class, getSourceElements() ); 141 142 return sourceTaps; 143 } 144 145 @Override 146 public Map<String, Tap> getSourceTapsMap() 147 { 148 Map<String, Tap> result = new HashMap<>(); 149 Set<Tap> sourceTaps = getSourceTaps(); 150 151 for( Tap sourceTap : sourceTaps ) 152 { 153 for( Process process : graph.vertexSet() ) 154 { 155 if( !process.getSourceTaps().contains( sourceTap ) ) 156 continue; 157 158 ElementGraph elementGraph = process.getElementGraph(); 159 160 for( Scope scope : elementGraph.outgoingEdgesOf( sourceTap ) ) 161 result.put( scope.getName(), sourceTap ); 162 } 163 } 164 165 return result; 166 } 167 168 @Override 169 public Set<Tap> getSinkTaps() 170 { 171 if( sinkTaps != null ) 172 return sinkTaps; 173 174 sinkTaps = Util.narrowIdentitySet( Tap.class, getSinkElements() ); 175 176 return sinkTaps; 177 } 178 179 @Override 180 public Map<String, Tap> getSinkTapsMap() 181 { 182 Map<String, Tap> result = new HashMap<>(); 183 Set<Tap> sinkTaps = getSinkTaps(); 184 185 for( Tap sinkTap : sinkTaps ) 186 { 187 for( Process process : graph.vertexSet() ) 188 { 189 if( !process.getSinkTaps().contains( sinkTap ) ) 190 continue; 191 192 ElementGraph elementGraph = process.getElementGraph(); 193 194 for( Scope scope : elementGraph.incomingEdgesOf( sinkTap ) ) 195 result.put( scope.getName(), sinkTap ); 196 } 197 } 198 199 return result; 200 } 201 202 @Override 203 public Map<String, Tap> getTrapsMap() 204 { 205 return trapsMap; 206 } 207 208 @Override 209 public Iterator<Process> getTopologicalIterator() 210 { 211 return getOrderedTopologicalIterator( new Comparator<Process>() 212 { 213 @Override 214 public int compare( Process lhs, Process rhs ) 215 { 216 return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() ); 217 } 218 } ); 219 } 220 221 @Override 222 public Iterator<Process> getOrdinalTopologicalIterator() 223 { 224 return getOrderedTopologicalIterator( new Comparator<Process>() 225 { 226 @Override 227 public int compare( Process lhs, Process rhs ) 228 { 229 return Integer.valueOf( lhs.getOrdinal() ).compareTo( rhs.getOrdinal() ); 230 } 231 } ); 232 } 233 234 @Override 235 public Iterator<Process> getOrderedTopologicalIterator( Comparator<Process> comparator ) 236 { 237 return new TopologicalOrderIterator<>( graph, new PriorityQueue<>( 10, comparator ) ); 238 } 239 240 @Override 241 public Set<ElementGraph> getElementGraphs() 242 { 243 Set<ElementGraph> results = createIdentitySet(); 244 245 for( Process process : vertexSet() ) 246 results.add( process.getElementGraph() ); 247 248 return results; 249 } 250 251 @Override 252 public List<ElementGraph> getElementGraphs( FlowElement flowElement ) 253 { 254 List<Process> elementProcesses = getElementProcesses( flowElement ); 255 256 List<ElementGraph> elementGraphs = new ArrayList<>(); 257 258 for( Process elementProcess : elementProcesses ) 259 elementGraphs.add( elementProcess.getElementGraph() ); 260 261 return elementGraphs; 262 } 263 264 @Override 265 public List<Process> getElementProcesses( FlowElement flowElement ) 266 { 267 List<Process> processes = new ArrayList<>(); 268 269 for( Process process : vertexSet() ) 270 { 271 if( process.getElementGraph().vertexSet().contains( flowElement ) ) 272 processes.add( process ); 273 } 274 275 return processes; 276 } 277 278 @Override 279 public List<ElementGraph> getElementGraphs( Scope scope ) 280 { 281 List<Process> elementProcesses = getElementProcesses( scope ); 282 283 List<ElementGraph> elementGraphs = new ArrayList<>(); 284 285 for( Process elementProcess : elementProcesses ) 286 elementGraphs.add( elementProcess.getElementGraph() ); 287 288 return elementGraphs; 289 } 290 291 @Override 292 public List<Process> getElementProcesses( Scope scope ) 293 { 294 List<Process> processes = new ArrayList<>(); 295 296 for( Process process : vertexSet() ) 297 { 298 if( process.getElementGraph().edgeSet().contains( scope ) ) 299 processes.add( process ); 300 } 301 302 return processes; 303 } 304 305 @Override 306 public List<Process> getElementSourceProcesses( FlowElement flowElement ) 307 { 308 List<Process> sources = new ArrayList<>(); 309 310 for( Process process : vertexSet() ) 311 { 312 if( process.getSinkElements().contains( flowElement ) ) 313 sources.add( process ); 314 } 315 316 return sources; 317 } 318 319 @Override 320 public List<Process> getElementSinkProcesses( FlowElement flowElement ) 321 { 322 List<Process> sinks = new ArrayList<>(); 323 324 for( Process process : vertexSet() ) 325 { 326 if( process.getSourceElements().contains( flowElement ) ) 327 sinks.add( process ); 328 } 329 330 return sinks; 331 } 332 333 @Override 334 public Set<FlowElement> getAllSourceElements() 335 { 336 Set<FlowElement> results = createIdentitySet(); 337 338 for( Process process : vertexSet() ) 339 results.addAll( process.getSourceElements() ); 340 341 return results; 342 } 343 344 @Override 345 public Set<FlowElement> getAllSinkElements() 346 { 347 Set<FlowElement> results = createIdentitySet(); 348 349 for( Process process : vertexSet() ) 350 results.addAll( process.getSinkElements() ); 351 352 return results; 353 } 354 355 public EnumMultiMap<FlowElement> getAnnotations() 356 { 357 EnumMultiMap<FlowElement> annotations = new EnumMultiMap<>(); 358 359 for( Process process : vertexSet() ) 360 { 361 ElementGraph elementGraph = process.getElementGraph(); 362 363 if( elementGraph instanceof AnnotatedGraph ) 364 annotations.addAll( ( (AnnotatedGraph) elementGraph ).getAnnotations() ); 365 } 366 367 return annotations; 368 } 369 370 /** 371 * All elements, from the given ElementGraph, that belong to two or more processes, that are not sink or source elements that 372 * connect processes. 373 * 374 * @return Set 375 */ 376 @Override 377 public Set<FlowElement> getDuplicatedElements( ElementGraph elementGraph ) 378 { 379 Set<FlowElement> results = createIdentitySet(); 380 381 for( FlowElement flowElement : elementGraph.vertexSet() ) 382 { 383 if( getElementProcesses( flowElement ).size() > 1 ) 384 results.add( flowElement ); 385 } 386 387 results.remove( Extent.head ); 388 results.remove( Extent.tail ); 389 results.removeAll( getAllSourceElements() ); 390 results.removeAll( getAllSinkElements() ); 391 392 return results; 393 } 394 395 @Override 396 public Set<ElementGraph> getIdentityElementGraphs() 397 { 398 Set<ElementGraph> results = createIdentitySet(); 399 400 for( Process process : getIdentityProcesses() ) 401 results.add( process.getElementGraph() ); 402 403 return results; 404 } 405 406 /** 407 * Returns a set of processes that perform no internal operations. 408 * <p/> 409 * for example if a FlowNode only has a Merge source and a GroupBy sink. 410 * 411 * @return 412 */ 413 @Override 414 public Set<Process> getIdentityProcesses() 415 { 416 Set<Process> results = new HashSet<>(); 417 418 for( Process process : vertexSet() ) 419 { 420 if( ProcessModels.isIdentity( process ) ) 421 results.add( process ); 422 } 423 424 return results; 425 } 426 427 /** 428 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 429 * 430 * @param filename of type String 431 */ 432 @Override 433 public void writeDOT( String filename ) 434 { 435 printProcessGraph( filename ); 436 } 437 438 protected void printProcessGraph( String filename ) 439 { 440 try 441 { 442 Writer writer = new FileWriter( filename ); 443 444 Util.writeDOT( writer, graph, new IntegerNameProvider<Process>(), new VertexNameProvider<Process>() 445 { 446 public String getVertexName( Process process ) 447 { 448 String name = "[" + process.getName() + "]"; 449 450 String sourceName = ""; 451 Set<Tap> sources = process.getSourceTaps(); 452 for( Tap source : sources ) 453 sourceName += "\\nsrc:[" + source.getIdentifier() + "]"; 454 455 if( sourceName.length() != 0 ) 456 name += sourceName; 457 458 Collection<Group> groups = process.getGroups(); 459 460 for( Group group : groups ) 461 { 462 String groupName = group.getName(); 463 464 if( groupName.length() != 0 ) 465 name += "\\ngrp:" + groupName; 466 } 467 468 Set<Tap> sinks = process.getSinkTaps(); 469 String sinkName = ""; 470 for( Tap sink : sinks ) 471 sinkName = "\\nsnk:[" + sink.getIdentifier() + "]"; 472 473 if( sinkName.length() != 0 ) 474 name += sinkName; 475 476 return name.replaceAll( "\"", "\'" ); 477 } 478 }, null ); 479 480 writer.close(); 481 } 482 catch( IOException exception ) 483 { 484 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 485 } 486 } 487 488 @Override 489 public void writeDOTNested( String filename, ElementGraph graph ) 490 { 491 ElementGraphs.printProcessGraph( filename, graph, this ); 492 } 493 494 public boolean containsEdge( Process sourceVertex, Process targetVertex ) 495 { 496 return graph.containsEdge( sourceVertex, targetVertex ); 497 } 498 499 public boolean removeAllEdges( Collection<? extends ProcessEdge> edges ) 500 { 501 return graph.removeAllEdges( edges ); 502 } 503 504 public Set<ProcessEdge> removeAllEdges( Process sourceVertex, Process targetVertex ) 505 { 506 return graph.removeAllEdges( sourceVertex, targetVertex ); 507 } 508 509 public boolean removeAllVertices( Collection<? extends Process> vertices ) 510 { 511 return graph.removeAllVertices( vertices ); 512 } 513 514 public Set<ProcessEdge> getAllEdges( Process sourceVertex, Process targetVertex ) 515 { 516 return graph.getAllEdges( sourceVertex, targetVertex ); 517 } 518 519 public ProcessEdge getEdge( Process sourceVertex, Process targetVertex ) 520 { 521 return graph.getEdge( sourceVertex, targetVertex ); 522 } 523 524 public ProcessEdge addEdge( Process sourceVertex, Process targetVertex ) 525 { 526 return graph.addEdge( sourceVertex, targetVertex ); 527 } 528 529 public boolean addEdge( Process sourceVertex, Process targetVertex, ProcessEdge processEdge ) 530 { 531 return graph.addEdge( sourceVertex, targetVertex, processEdge ); 532 } 533 534 public Process getEdgeSource( ProcessEdge processEdge ) 535 { 536 return graph.getEdgeSource( processEdge ); 537 } 538 539 public Process getEdgeTarget( ProcessEdge processEdge ) 540 { 541 return graph.getEdgeTarget( processEdge ); 542 } 543 544 public boolean containsEdge( ProcessEdge processEdge ) 545 { 546 return graph.containsEdge( processEdge ); 547 } 548 549 public boolean containsVertex( Process process ) 550 { 551 return graph.containsVertex( process ); 552 } 553 554 public Set<ProcessEdge> edgeSet() 555 { 556 return graph.edgeSet(); 557 } 558 559 public Set<ProcessEdge> edgesOf( Process vertex ) 560 { 561 return graph.edgesOf( vertex ); 562 } 563 564 public int inDegreeOf( Process vertex ) 565 { 566 return graph.inDegreeOf( vertex ); 567 } 568 569 public Set<ProcessEdge> incomingEdgesOf( Process vertex ) 570 { 571 return graph.incomingEdgesOf( vertex ); 572 } 573 574 public int outDegreeOf( Process vertex ) 575 { 576 return graph.outDegreeOf( vertex ); 577 } 578 579 public Set<ProcessEdge> outgoingEdgesOf( Process vertex ) 580 { 581 return graph.outgoingEdgesOf( vertex ); 582 } 583 584 public ProcessEdge removeEdge( Process sourceVertex, Process targetVertex ) 585 { 586 return graph.removeEdge( sourceVertex, targetVertex ); 587 } 588 589 public boolean removeEdge( ProcessEdge processEdge ) 590 { 591 return graph.removeEdge( processEdge ); 592 } 593 594 public boolean removeVertex( Process process ) 595 { 596 return graph.removeVertex( process ); 597 } 598 599 public Set<Process> vertexSet() 600 { 601 return graph.vertexSet(); 602 } 603 604 public double getEdgeWeight( ProcessEdge processEdge ) 605 { 606 return graph.getEdgeWeight( processEdge ); 607 } 608 }