001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez.stream.element; 023 024import java.io.IOException; 025import java.util.Collection; 026import java.util.HashSet; 027import java.util.Set; 028 029import cascading.CascadingException; 030import cascading.flow.FlowProcess; 031import cascading.flow.SliceCounters; 032import cascading.flow.planner.Scope; 033import cascading.flow.stream.duct.Duct; 034import cascading.flow.stream.duct.DuctException; 035import cascading.flow.stream.element.InputSource; 036import cascading.flow.stream.element.SpliceGate; 037import cascading.flow.stream.graph.IORole; 038import cascading.flow.stream.graph.StreamGraph; 039import cascading.pipe.Pipe; 040import cascading.pipe.Splice; 041import cascading.tap.hadoop.util.MeasuredOutputCollector; 042import cascading.tuple.Tuple; 043import cascading.tuple.TupleEntry; 044import cascading.tuple.io.KeyTuple; 045import cascading.tuple.io.ValueTuple; 046import cascading.tuple.util.Resettable1; 047import cascading.util.SortedListMultiMap; 048import cascading.util.Util; 049import org.apache.hadoop.mapred.OutputCollector; 050import org.apache.tez.runtime.api.LogicalInput; 051import org.apache.tez.runtime.api.LogicalOutput; 052import org.apache.tez.runtime.library.api.KeyValueReader; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * 058 */ 059public class TezMergeGate extends SpliceGate<TupleEntry, TupleEntry> implements InputSource 060 { 061 private static final Logger LOG = LoggerFactory.getLogger( TezMergeGate.class ); 062 063 protected Collection<LogicalOutput> logicalOutputs; 064 protected SortedListMultiMap<Integer, LogicalInput> logicalInputs; 065 066 private MeasuredOutputCollector collector; 067 private TupleEntry valueEntry; 068 069 private final Resettable1<Tuple> keyTuple = new KeyTuple(); 070 071 public TezMergeGate( FlowProcess flowProcess, Splice splice, IORole role, Collection<LogicalOutput> logicalOutputs ) 072 { 073 super( flowProcess, splice, role ); 074 075 if( logicalOutputs == null || logicalOutputs.isEmpty() ) 076 throw new IllegalArgumentException( "output must not be null or empty" ); 077 078 this.logicalOutputs = logicalOutputs; 079 } 080 081 public TezMergeGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 082 { 083 super( flowProcess, splice, role ); 084 085 if( logicalInputs == null || logicalInputs.getKeys().size() == 0 ) 086 throw new IllegalArgumentException( "inputs must not be null or empty" ); 087 088 Set<LogicalInput> inputs = new HashSet<>( logicalInputs.getValues() ); 089 090 if( inputs.size() != 1 ) 091 throw new IllegalArgumentException( "only supports a single input" ); 092 093 this.logicalInputs = logicalInputs; 094 } 095 096 @Override 097 public void initialize() 098 { 099 super.initialize(); 100 101 Scope outgoingScope = Util.getFirst( outgoingScopes ); 102 valueEntry = new TupleEntry( outgoingScope.getOutValuesFields(), true ); 103 } 104 105 @Override 106 public void bind( StreamGraph streamGraph ) 107 { 108 if( role != IORole.sink ) 109 next = getNextFor( streamGraph ); 110 } 111 112 @Override 113 public void prepare() 114 { 115 try 116 { 117 if( logicalInputs != null ) 118 { 119 for( LogicalInput logicalInput : logicalInputs.getValues() ) 120 { 121 LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() ); 122 123 logicalInput.start(); 124 } 125 } 126 127 if( logicalOutputs != null ) 128 { 129 for( LogicalOutput logicalOutput : logicalOutputs ) 130 { 131 LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) ); 132 133 logicalOutput.start(); 134 } 135 } 136 } 137 catch( Exception exception ) 138 { 139 throw new CascadingException( "unable to start input/output", exception ); 140 } 141 142 if( role != IORole.source ) 143 collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() ); 144 145 super.prepare(); 146 } 147 148 @Override 149 public void start( Duct previous ) 150 { 151 if( next != null ) 152 super.start( previous ); 153 } 154 155 @Override 156 public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) 157 { 158 try 159 { 160 keyTuple.reset( incomingEntry.getTuple() ); 161 162 collector.collect( keyTuple, ValueTuple.NULL ); 163 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 164 } 165 catch( OutOfMemoryError error ) 166 { 167 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 168 } 169 catch( CascadingException exception ) 170 { 171 handleException( exception, incomingEntry ); 172 } 173 catch( Throwable throwable ) 174 { 175 handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry ); 176 } 177 } 178 179 @Override 180 public void complete( Duct previous ) 181 { 182 if( next != null ) 183 super.complete( previous ); 184 } 185 186 @Override 187 public void run( Object input ) throws Throwable 188 { 189 Throwable throwable = map(); 190 191 if( throwable != null ) 192 throw throwable; 193 } 194 195 protected Throwable map() throws Exception 196 { 197 Throwable localThrowable = null; 198 199 try 200 { 201 start( this ); 202 203 // if multiple ordinals, an input could be duplicated if sourcing multiple paths 204 LogicalInput logicalInput = Util.getFirst( logicalInputs.getValues() ); 205 206 KeyValueReader reader = (KeyValueReader) logicalInput.getReader(); 207 208 while( reader.next() ) 209 { 210 Tuple currentKey = (Tuple) reader.getCurrentKey(); 211 212 valueEntry.setTuple( currentKey ); 213 next.receive( this, 0, valueEntry ); 214 } 215 216 complete( this ); 217 } 218 catch( Throwable throwable ) 219 { 220 if( !( throwable instanceof OutOfMemoryError ) ) 221 LOG.error( "caught throwable", throwable ); 222 223 return throwable; 224 } 225 226 return localThrowable; 227 } 228 229 protected OutputCollector createOutputCollector() 230 { 231 if( logicalOutputs.size() == 1 ) 232 return new OldOutputCollector( Util.getFirst( logicalOutputs ) ); 233 234 final OutputCollector[] collectors = new OutputCollector[ logicalOutputs.size() ]; 235 236 int count = 0; 237 for( LogicalOutput logicalOutput : logicalOutputs ) 238 collectors[ count++ ] = new OldOutputCollector( logicalOutput ); 239 240 return new OutputCollector() 241 { 242 @Override 243 public void collect( Object key, Object value ) throws IOException 244 { 245 for( OutputCollector outputCollector : collectors ) 246 outputCollector.collect( key, value ); 247 } 248 }; 249 } 250 }