001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez.stream.element; 023 024import java.io.IOException; 025import java.util.Collection; 026 027import cascading.CascadingException; 028import cascading.flow.FlowProcess; 029import cascading.flow.SliceCounters; 030import cascading.flow.planner.Scope; 031import cascading.flow.stream.duct.Duct; 032import cascading.flow.stream.duct.DuctException; 033import cascading.flow.stream.element.BoundaryStage; 034import cascading.flow.stream.element.InputSource; 035import cascading.flow.stream.graph.IORole; 036import cascading.flow.stream.graph.StreamGraph; 037import cascading.pipe.Boundary; 038import cascading.pipe.Pipe; 039import cascading.tap.hadoop.util.MeasuredOutputCollector; 040import cascading.tuple.Tuple; 041import cascading.tuple.TupleEntry; 042import cascading.tuple.io.KeyTuple; 043import cascading.tuple.io.ValueTuple; 044import cascading.tuple.util.Resettable1; 045import cascading.util.Util; 046import org.apache.hadoop.mapred.OutputCollector; 047import org.apache.tez.runtime.api.LogicalInput; 048import org.apache.tez.runtime.api.LogicalOutput; 049import org.apache.tez.runtime.library.api.KeyValueReader; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * 055 */ 056public class TezBoundaryStage extends BoundaryStage<TupleEntry, TupleEntry> implements InputSource 057 { 058 private static final Logger LOG = LoggerFactory.getLogger( TezBoundaryStage.class ); 059 060 protected Collection<LogicalOutput> logicalOutputs; 061 protected LogicalInput logicalInput; 062 063 private MeasuredOutputCollector collector; 064 private TupleEntry valueEntry; 065 066 private final Resettable1<Tuple> keyTuple = new KeyTuple(); 067 068 public TezBoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role, Collection<LogicalOutput> logicalOutputs ) 069 { 070 super( flowProcess, boundary, role ); 071 072 if( logicalOutputs == null || logicalOutputs.isEmpty() ) 073 throw new IllegalArgumentException( "output must not be null or empty" ); 074 075 this.logicalOutputs = logicalOutputs; 076 } 077 078 public TezBoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role, LogicalInput logicalInput ) 079 { 080 super( flowProcess, boundary, role ); 081 082 if( logicalInput == null ) 083 throw new IllegalArgumentException( "inputs must not be null or empty" ); 084 085 this.logicalInput = logicalInput; 086 } 087 088 @Override 089 public void initialize() 090 { 091 super.initialize(); 092 093 Scope outgoingScope = Util.getFirst( outgoingScopes ); 094 valueEntry = new TupleEntry( outgoingScope.getIncomingFunctionPassThroughFields(), true ); 095 } 096 097 @Override 098 public void bind( StreamGraph streamGraph ) 099 { 100 if( role != IORole.sink ) 101 next = getNextFor( streamGraph ); 102 } 103 104 @Override 105 public void prepare() 106 { 107 try 108 { 109 if( logicalInput != null ) 110 { 111 LOG.info( "calling {}#start() on: {} {}", logicalInput.getClass().getSimpleName(), getBoundary(), Pipe.id( getBoundary() ) ); 112 113 logicalInput.start(); 114 } 115 116 if( logicalOutputs != null ) 117 { 118 for( LogicalOutput logicalOutput : logicalOutputs ) 119 { 120 LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getBoundary(), Pipe.id( getBoundary() ) ); 121 122 logicalOutput.start(); 123 } 124 } 125 } 126 catch( Exception exception ) 127 { 128 throw new CascadingException( "unable to start input/output", exception ); 129 } 130 131 if( role != IORole.source ) 132 collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() ); 133 134 super.prepare(); 135 } 136 137 @Override 138 public void start( Duct previous ) 139 { 140 if( next != null ) 141 super.start( previous ); 142 } 143 144 @Override 145 public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) 146 { 147 try 148 { 149 Tuple tuple = incomingEntry.getTuple(); 150 151 keyTuple.reset( tuple ); 152 153 collector.collect( keyTuple, ValueTuple.NULL ); 154 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 155 } 156 catch( OutOfMemoryError error ) 157 { 158 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 159 } 160 catch( CascadingException exception ) 161 { 162 handleException( exception, incomingEntry ); 163 } 164 catch( Throwable throwable ) 165 { 166 handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry ); 167 } 168 } 169 170 @Override 171 public void complete( Duct previous ) 172 { 173 if( next != null ) 174 super.complete( previous ); 175 } 176 177 @Override 178 public void run( Object input ) throws Throwable 179 { 180 Throwable throwable = map(); 181 182 if( throwable != null ) 183 throw throwable; 184 } 185 186 protected Throwable map() throws Exception 187 { 188 Throwable localThrowable = null; 189 190 try 191 { 192 start( this ); 193 194 KeyValueReader reader = (KeyValueReader) logicalInput.getReader(); 195 196 while( reader.next() ) 197 { 198 Tuple currentKey = (Tuple) reader.getCurrentKey(); 199 200 valueEntry.setTuple( currentKey ); 201 next.receive( this, 0, valueEntry ); 202 } 203 204 complete( this ); 205 } 206 catch( Throwable throwable ) 207 { 208 if( !( throwable instanceof OutOfMemoryError ) ) 209 LOG.error( "caught throwable", throwable ); 210 211 return throwable; 212 } 213 214 return localThrowable; 215 } 216 217 protected OutputCollector createOutputCollector() 218 { 219 if( logicalOutputs.size() == 1 ) 220 return new OldOutputCollector( Util.getFirst( logicalOutputs ) ); 221 222 final OutputCollector[] collectors = new OutputCollector[ logicalOutputs.size() ]; 223 224 int count = 0; 225 for( LogicalOutput logicalOutput : logicalOutputs ) 226 collectors[ count++ ] = new OldOutputCollector( logicalOutput ); 227 228 return new OutputCollector() 229 { 230 @Override 231 public void collect( Object key, Object value ) throws IOException 232 { 233 for( OutputCollector outputCollector : collectors ) 234 outputCollector.collect( key, value ); 235 } 236 }; 237 } 238 }