001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez.stream.graph; 023 024import java.io.IOException; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import cascading.flow.FlowElement; 033import cascading.flow.FlowElements; 034import cascading.flow.FlowException; 035import cascading.flow.FlowNode; 036import cascading.flow.FlowProcess; 037import cascading.flow.Flows; 038import cascading.flow.hadoop.stream.HadoopMemoryJoinGate; 039import cascading.flow.hadoop.util.HadoopUtil; 040import cascading.flow.stream.annotations.StreamMode; 041import cascading.flow.stream.duct.Duct; 042import cascading.flow.stream.duct.Gate; 043import cascading.flow.stream.element.InputSource; 044import cascading.flow.stream.element.MemoryHashJoinGate; 045import cascading.flow.stream.element.SinkStage; 046import cascading.flow.stream.element.SourceStage; 047import cascading.flow.stream.graph.IORole; 048import cascading.flow.stream.graph.NodeStreamGraph; 049import cascading.flow.tez.Hadoop2TezFlowProcess; 050import cascading.flow.tez.stream.element.TezBoundaryStage; 051import cascading.flow.tez.stream.element.TezCoGroupGate; 052import cascading.flow.tez.stream.element.TezGroupByGate; 053import cascading.flow.tez.stream.element.TezMergeGate; 054import cascading.flow.tez.stream.element.TezSinkStage; 055import cascading.flow.tez.stream.element.TezSourceStage; 056import cascading.flow.tez.util.TezUtil; 057import cascading.pipe.Boundary; 058import cascading.pipe.CoGroup; 059import cascading.pipe.Group; 060import cascading.pipe.GroupBy; 061import cascading.pipe.HashJoin; 062import cascading.pipe.Merge; 063import cascading.pipe.Pipe; 064import cascading.tap.Tap; 065import cascading.util.SetMultiMap; 066import cascading.util.SortedListMultiMap; 067import cascading.util.Util; 068import org.apache.hadoop.conf.Configuration; 069import org.apache.tez.dag.api.TezConfiguration; 070import org.apache.tez.runtime.api.LogicalInput; 071import org.apache.tez.runtime.api.LogicalOutput; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import static cascading.flow.tez.util.TezUtil.*; 076 077/** 078 * 079 */ 080public class Hadoop2TezStreamGraph extends NodeStreamGraph 081 { 082 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezStreamGraph.class ); 083 084 private InputSource streamedHead; 085 private Map<String, LogicalInput> inputMap; 086 private Map<String, LogicalOutput> outputMap; 087 private Map<LogicalInput, Configuration> inputConfigMap = new HashMap<>(); 088 private Map<LogicalOutput, Configuration> outputConfigMap = new HashMap<>(); 089 private SetMultiMap<String, LogicalInput> inputMultiMap; 090 private SetMultiMap<String, LogicalOutput> outputMultiMap; 091 092 public Hadoop2TezStreamGraph( Hadoop2TezFlowProcess currentProcess, FlowNode flowNode, Map<String, LogicalInput> inputMap, Map<String, LogicalOutput> outputMap ) 093 { 094 super( currentProcess, flowNode ); 095 this.inputMap = inputMap; 096 this.outputMap = outputMap; 097 098 buildGraph(); 099 100 setTraps(); 101 setScopes(); 102 103 printGraph( node.getID(), node.getName(), flowProcess.getCurrentSliceNum() ); 104 105 bind(); 106 107 printBoundGraph( node.getID(), node.getName(), flowProcess.getCurrentSliceNum() ); 108 } 109 110 public InputSource getStreamedHead() 111 { 112 return streamedHead; 113 } 114 115 protected void buildGraph() 116 { 117 inputMultiMap = new SetMultiMap<>(); 118 119 for( Map.Entry<String, LogicalInput> entry : inputMap.entrySet() ) 120 { 121 Configuration inputConfiguration = getInputConfiguration( entry.getValue() ); 122 inputConfigMap.put( entry.getValue(), inputConfiguration ); 123 124 inputMultiMap.addAll( getEdgeSourceID( entry.getValue(), inputConfiguration ), entry.getValue() ); 125 } 126 127 outputMultiMap = new SetMultiMap<>(); 128 129 for( Map.Entry<String, LogicalOutput> entry : outputMap.entrySet() ) 130 { 131 Configuration outputConfiguration = getOutputConfiguration( entry.getValue() ); 132 outputConfigMap.put( entry.getValue(), outputConfiguration ); 133 134 outputMultiMap.addAll( TezUtil.getEdgeSinkID( entry.getValue(), outputConfiguration ), entry.getValue() ); 135 } 136 137 // this made the assumption we can have a physical and logical input per vertex. seems we can't 138 if( inputMultiMap.getKeys().size() == 1 ) 139 { 140 streamedSource = Flows.getFlowElementForID( node.getSourceElements(), Util.getFirst( inputMultiMap.getKeys() ) ); 141 } 142 else 143 { 144 Set<FlowElement> sourceElements = new HashSet<>( node.getSourceElements() ); 145 Set<? extends FlowElement> accumulated = node.getSourceElements( StreamMode.Accumulated ); 146 147 sourceElements.removeAll( accumulated ); 148 149 if( sourceElements.size() != 1 ) 150 throw new IllegalStateException( "too many input source keys, got: " + Util.join( sourceElements, ", " ) ); 151 152 streamedSource = Util.getFirst( sourceElements ); 153 } 154 155 LOG.info( "using streamed source: " + streamedSource ); 156 157 streamedHead = handleHead( streamedSource, flowProcess ); 158 159 Set<FlowElement> accumulated = new HashSet<>( node.getSourceElements() ); 160 161 accumulated.remove( streamedSource ); 162 163 Hadoop2TezFlowProcess tezProcess = (Hadoop2TezFlowProcess) flowProcess; 164 TezConfiguration conf = tezProcess.getConfiguration(); 165 166 for( FlowElement flowElement : accumulated ) 167 { 168 LOG.info( "using accumulated source: " + flowElement ); 169 170 if( flowElement instanceof Tap ) 171 { 172 Tap source = (Tap) flowElement; 173 174 // allows client side config to be used cluster side 175 String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( source ) ); 176 177 if( property == null ) 178 throw new IllegalStateException( "accumulated source conf property missing for: " + source.getIdentifier() ); 179 180 conf = getSourceConf( tezProcess, conf, property ); 181 } 182 else 183 { 184 conf = (TezConfiguration) inputConfigMap.get( FlowElements.id( flowElement ) ); 185 } 186 187 FlowProcess flowProcess = conf == null ? tezProcess : new Hadoop2TezFlowProcess( tezProcess, conf ); 188 189 handleHead( flowElement, flowProcess ); 190 } 191 } 192 193 private TezConfiguration getSourceConf( FlowProcess<TezConfiguration> flowProcess, TezConfiguration conf, String property ) 194 { 195 Map<String, String> priorConf; 196 197 try 198 { 199 priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true ); 200 } 201 catch( IOException exception ) 202 { 203 throw new FlowException( "unable to deserialize properties", exception ); 204 } 205 206 return flowProcess.mergeMapIntoConfig( conf, priorConf ); 207 } 208 209 private InputSource handleHead( FlowElement source, FlowProcess flowProcess ) 210 { 211 Duct sourceDuct; 212 213 if( source instanceof Tap ) 214 sourceDuct = createSourceStage( (Tap) source, flowProcess ); 215 else if( source instanceof Merge ) 216 sourceDuct = createMergeStage( (Merge) source, IORole.source ); 217 else if( source instanceof Boundary ) 218 sourceDuct = createBoundaryStage( (Boundary) source, IORole.source ); 219 else if( ( (Group) source ).isGroupBy() ) 220 sourceDuct = createGroupByGate( (GroupBy) source, IORole.source ); 221 else 222 sourceDuct = createCoGroupGate( (CoGroup) source, IORole.source ); 223 224 addHead( sourceDuct ); 225 226 handleDuct( source, sourceDuct ); 227 228 return (InputSource) sourceDuct; 229 } 230 231 protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess ) 232 { 233 String id = Tap.id( source ); 234 LogicalInput logicalInput = inputMap.get( id ); 235 236 if( logicalInput == null ) 237 logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) ); 238 239 if( logicalInput == null ) 240 return new SourceStage( flowProcess, source ); 241 242 return new TezSourceStage( flowProcess, source, logicalInput ); 243 } 244 245 @Override 246 protected SinkStage createSinkStage( Tap sink ) 247 { 248 String id = Tap.id( sink ); 249 LogicalOutput logicalOutput = outputMap.get( id ); 250 251 if( logicalOutput == null ) 252 logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) ); 253 254 if( logicalOutput == null ) 255 throw new IllegalStateException( "could not find output for: " + sink ); 256 257 return new TezSinkStage( flowProcess, sink, logicalOutput ); 258 } 259 260 @Override 261 protected Duct createMergeStage( Merge element, IORole role ) 262 { 263 if( role == IORole.pass ) 264 return super.createMergeStage( element, IORole.pass ); 265 else if( role == IORole.sink ) 266 return createSinkMergeGate( element ); 267 else if( role == IORole.source ) 268 return createSourceMergeGate( element ); 269 else 270 throw new UnsupportedOperationException( "both role not supported with merge" ); 271 } 272 273 private Duct createSourceMergeGate( Merge element ) 274 { 275 return new TezMergeGate( flowProcess, element, IORole.source, createInputMap( element ) ); 276 } 277 278 private Duct createSinkMergeGate( Merge element ) 279 { 280 return new TezMergeGate( flowProcess, element, IORole.sink, findLogicalOutputs( element ) ); 281 } 282 283 @Override 284 protected Duct createBoundaryStage( Boundary element, IORole role ) 285 { 286 if( role == IORole.pass ) 287 return super.createBoundaryStage( element, IORole.pass ); 288 else if( role == IORole.sink ) 289 return createSinkBoundaryStage( element ); 290 else if( role == IORole.source ) 291 return createSourceBoundaryStage( element ); 292 else 293 throw new UnsupportedOperationException( "both role not supported with boundary" ); 294 } 295 296 private Duct createSourceBoundaryStage( Boundary element ) 297 { 298 return new TezBoundaryStage( flowProcess, element, IORole.source, findLogicalInput( element ) ); 299 } 300 301 private Duct createSinkBoundaryStage( Boundary element ) 302 { 303 return new TezBoundaryStage( flowProcess, element, IORole.sink, findLogicalOutputs( element ) ); 304 } 305 306 @Override 307 protected Gate createGroupByGate( GroupBy element, IORole role ) 308 { 309 if( role == IORole.sink ) 310 return createSinkGroupByGate( element ); 311 else 312 return createSourceGroupByGate( element ); 313 } 314 315 @Override 316 protected Gate createCoGroupGate( CoGroup element, IORole role ) 317 { 318 if( role == IORole.sink ) 319 return createSinkCoGroupByGate( element ); 320 else 321 return createSourceCoGroupByGate( element ); 322 } 323 324 private Gate createSinkCoGroupByGate( CoGroup element ) 325 { 326 return new TezCoGroupGate( flowProcess, element, IORole.sink, findLogicalOutput( element ) ); 327 } 328 329 private Gate createSourceCoGroupByGate( CoGroup element ) 330 { 331 return new TezCoGroupGate( flowProcess, element, IORole.source, createInputMap( element ) ); 332 } 333 334 protected Gate createSinkGroupByGate( GroupBy element ) 335 { 336 return new TezGroupByGate( flowProcess, element, IORole.sink, findLogicalOutput( element ) ); 337 } 338 339 protected Gate createSourceGroupByGate( GroupBy element ) 340 { 341 return new TezGroupByGate( flowProcess, element, IORole.source, createInputMap( element ) ); 342 } 343 344 private LogicalOutput findLogicalOutput( Pipe element ) 345 { 346 String id = Pipe.id( element ); 347 LogicalOutput logicalOutput = outputMap.get( id ); 348 349 if( logicalOutput == null ) 350 logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) ); 351 352 if( logicalOutput == null ) 353 throw new IllegalStateException( "could not find output for: " + element ); 354 355 return logicalOutput; 356 } 357 358 private Collection<LogicalOutput> findLogicalOutputs( Pipe element ) 359 { 360 String id = Pipe.id( element ); 361 362 return outputMultiMap.getValues( id ); 363 } 364 365 private LogicalInput findLogicalInput( Pipe element ) 366 { 367 String id = Pipe.id( element ); 368 LogicalInput logicalInput = inputMap.get( id ); 369 370 if( logicalInput == null ) 371 logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) ); 372 373 if( logicalInput == null ) 374 throw new IllegalStateException( "could not find input for: " + element ); 375 376 return logicalInput; 377 } 378 379 /** 380 * Maps each input to an ordinal on the flowelement. an input may be bound to multiple ordinals. 381 * 382 * @param element 383 */ 384 private SortedListMultiMap<Integer, LogicalInput> createInputMap( FlowElement element ) 385 { 386 String id = FlowElements.id( element ); 387 SortedListMultiMap<Integer, LogicalInput> ordinalMap = new SortedListMultiMap<>(); 388 389 for( LogicalInput logicalInput : inputMap.values() ) 390 { 391 Configuration configuration = inputConfigMap.get( logicalInput ); 392 393 String foundID = configuration.get( "cascading.node.source" ); 394 395 if( Util.isEmpty( foundID ) ) 396 throw new IllegalStateException( "cascading.node.source property not set on source LogicalInput" ); 397 398 if( !foundID.equals( id ) ) 399 continue; 400 401 String values = configuration.get( "cascading.node.ordinals", "" ); 402 List<Integer> ordinals = Util.split( Integer.class, ",", values ); 403 404 for( Integer ordinal : ordinals ) 405 ordinalMap.put( ordinal, logicalInput ); 406 } 407 408 return ordinalMap; 409 } 410 411 @Override 412 protected MemoryHashJoinGate createNonBlockingJoinGate( HashJoin join ) 413 { 414 return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch 415 } 416 }