001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.stream.graph; 022 023import cascading.flow.FlowElement; 024import cascading.flow.FlowNode; 025import cascading.flow.hadoop.HadoopFlowProcess; 026import cascading.flow.hadoop.stream.element.HadoopCoGroupGate; 027import cascading.flow.hadoop.stream.element.HadoopGroupByGate; 028import cascading.flow.hadoop.stream.element.HadoopSinkStage; 029import cascading.flow.stream.duct.Duct; 030import cascading.flow.stream.duct.Gate; 031import cascading.flow.stream.element.SinkStage; 032import cascading.flow.stream.graph.IORole; 033import cascading.flow.stream.graph.NodeStreamGraph; 034import cascading.pipe.CoGroup; 035import cascading.pipe.Group; 036import cascading.pipe.GroupBy; 037import cascading.pipe.HashJoin; 038import cascading.tap.Tap; 039import cascading.util.Util; 040 041/** 042 * 043 */ 044public class HadoopReduceStreamGraph extends NodeStreamGraph 045 { 046 public HadoopReduceStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, FlowElement sourceElement ) 047 { 048 super( flowProcess, node, sourceElement ); 049 050 buildGraph(); 051 052 setTraps(); 053 setScopes(); 054 055 printGraph( node.getID(), "reduce", flowProcess.getCurrentSliceNum() ); 056 057 bind(); 058 } 059 060 protected void buildGraph() 061 { 062 Group group = (Group) Util.getFirst( node.getSourceElements() ); 063 064 Duct rhsDuct; 065 066 if( group.isGroupBy() ) 067 rhsDuct = new HadoopGroupByGate( flowProcess, (GroupBy) group, IORole.source ); 068 else 069 rhsDuct = new HadoopCoGroupGate( flowProcess, (CoGroup) group, IORole.source ); 070 071 addHead( rhsDuct ); 072 073 handleDuct( group, rhsDuct ); 074 } 075 076 @Override 077 protected SinkStage createSinkStage( Tap element ) 078 { 079 return new HadoopSinkStage( flowProcess, element ); 080 } 081 082 protected Gate createCoGroupGate( CoGroup element, IORole role ) 083 { 084 throw new IllegalStateException( "should not happen" ); 085 } 086 087 @Override 088 protected Gate createGroupByGate( GroupBy element, IORole role ) 089 { 090 throw new IllegalStateException( "should not happen" ); 091 } 092 093 @Override 094 protected Gate createHashJoinGate( HashJoin join ) 095 { 096 throw new IllegalStateException( "should not happen" ); 097 } 098 099 }