001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.stream.graph; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Set; 027 028import cascading.flow.FlowException; 029import cascading.flow.FlowNode; 030import cascading.flow.FlowProcess; 031import cascading.flow.hadoop.HadoopFlowProcess; 032import cascading.flow.hadoop.stream.HadoopMemoryJoinGate; 033import cascading.flow.hadoop.stream.element.HadoopCoGroupGate; 034import cascading.flow.hadoop.stream.element.HadoopGroupByGate; 035import cascading.flow.hadoop.stream.element.HadoopSinkStage; 036import cascading.flow.hadoop.util.HadoopUtil; 037import cascading.flow.planner.graph.ElementGraphs; 038import cascading.flow.stream.duct.Gate; 039import cascading.flow.stream.element.GroupingSpliceGate; 040import cascading.flow.stream.element.SinkStage; 041import cascading.flow.stream.element.SourceStage; 042import cascading.flow.stream.graph.IORole; 043import cascading.flow.stream.graph.NodeStreamGraph; 044import cascading.pipe.CoGroup; 045import cascading.pipe.GroupBy; 046import cascading.pipe.HashJoin; 047import cascading.tap.Tap; 048import org.apache.hadoop.mapred.JobConf; 049import org.apache.hadoop.mapred.Reporter; 050 051/** 052 * 053 */ 054public class HadoopMapStreamGraph extends NodeStreamGraph 055 { 056 private final Tap source; 057 private SourceStage streamedHead; 058 059 public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, Tap source ) 060 { 061 super( flowProcess, node, source ); 062 this.source = source; 063 064 buildGraph(); 065 066 setTraps(); 067 setScopes(); 068 069 printGraph( node.getID(), "map", flowProcess.getCurrentSliceNum() ); 070 bind(); 071 } 072 073 public SourceStage getStreamedHead() 074 { 075 return streamedHead; 076 } 077 078 protected void buildGraph() 079 { 080 streamedHead = handleHead( this.source, flowProcess ); 081 082 Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class ); 083 084 tributaries.remove( this.source ); // we cannot stream and accumulate the same source 085 086 // accumulated paths 087 for( Object source : tributaries ) 088 { 089 final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess; 090 JobConf conf = hadoopProcess.getJobConf(); 091 092 // allows client side config to be used cluster side 093 String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) ); 094 095 if( property == null ) 096 throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() ); 097 098 conf = getSourceConf( hadoopProcess, conf, property ); 099 100 // the reporter isn't provided until after the #run method is called 101 flowProcess = new HadoopFlowProcess( hadoopProcess, conf ) 102 { 103 @Override 104 public Reporter getReporter() 105 { 106 return hadoopProcess.getReporter(); 107 } 108 }; 109 110 handleHead( (Tap) source, flowProcess ); 111 } 112 } 113 114 private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property ) 115 { 116 Map<String, String> priorConf; 117 try 118 { 119 priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true ); 120 } 121 catch( IOException exception ) 122 { 123 throw new FlowException( "unable to deserialize properties", exception ); 124 } 125 126 return flowProcess.mergeMapIntoConfig( conf, priorConf ); 127 } 128 129 private SourceStage handleHead( Tap source, FlowProcess flowProcess ) 130 { 131 SourceStage sourceDuct = new SourceStage( flowProcess, source ); 132 133 addHead( sourceDuct ); 134 135 handleDuct( source, sourceDuct ); 136 137 return sourceDuct; 138 } 139 140 @Override 141 protected SinkStage createSinkStage( Tap element ) 142 { 143 return new HadoopSinkStage( flowProcess, element ); 144 } 145 146 @Override 147 protected Gate createCoGroupGate( CoGroup element, IORole role ) 148 { 149 return new HadoopCoGroupGate( flowProcess, element, IORole.sink ); 150 } 151 152 @Override 153 protected Gate createGroupByGate( GroupBy element, IORole role ) 154 { 155 return new HadoopGroupByGate( flowProcess, element, role ); 156 } 157 158 @Override 159 protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join ) 160 { 161 return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch 162 } 163 }