001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez.stream.element; 023 024import java.util.Collections; 025import java.util.List; 026import java.util.Map; 027 028import cascading.CascadingException; 029import cascading.flow.FlowProcess; 030import cascading.flow.SliceCounters; 031import cascading.flow.hadoop.HadoopCoGroupClosure; 032import cascading.flow.hadoop.util.TimedIterator; 033import cascading.flow.stream.StopDataNotificationException; 034import cascading.flow.stream.duct.DuctException; 035import cascading.flow.stream.graph.IORole; 036import cascading.flow.tez.TezCoGroupClosure; 037import cascading.pipe.CoGroup; 038import cascading.tuple.Tuple; 039import cascading.tuple.io.TuplePair; 040import cascading.util.LogUtil; 041import cascading.util.SortedListMultiMap; 042import org.apache.tez.runtime.api.LogicalInput; 043import org.apache.tez.runtime.api.LogicalOutput; 044import org.apache.tez.runtime.library.api.KeyValuesReader; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * 050 */ 051public class TezCoGroupGate extends TezGroupGate 052 { 053 private static final Logger LOG = LoggerFactory.getLogger( TezCoGroupGate.class ); 054 055 protected TimedIterator<Tuple>[] timedIterators; 056 057 public TezCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role, LogicalOutput logicalOutput ) 058 { 059 super( flowProcess, coGroup, role, logicalOutput ); 060 } 061 062 public TezCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 063 { 064 super( flowProcess, coGroup, role, logicalInputs ); 065 066 this.timedIterators = new TimedIterator[ logicalInputs.getKeys().size() ]; 067 068 for( int i = 0; i < this.timedIterators.length; i++ ) 069 this.timedIterators[ i ] = new TimedIterator<>( flowProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read, i ); 070 } 071 072 @Override 073 protected Throwable reduce() throws Exception 074 { 075 try 076 { 077 start( this ); 078 079 SortedListMultiMap<Integer, KeyValuesReader> readers = getKeyValuesReaders(); 080 SortedListMultiMap<Tuple, Iterable<Tuple>> iterables = getSortedMultiMap( readers.getKeys().size() ); 081 082 Map.Entry<Tuple, List<Iterable<Tuple>>> current = forwardToNext( readers, iterables, null ); 083 List<Iterable<Tuple>> currentValues; 084 085 while( current != null ) 086 { 087 currentValues = current.getValue(); 088 089 for( int i = 0; i < timedIterators.length; i++ ) 090 timedIterators[ i ].reset( currentValues.get( i ) ); 091 092 try 093 { 094 accept( current.getKey(), timedIterators ); 095 } 096 catch( StopDataNotificationException exception ) 097 { 098 LogUtil.logWarnOnce( LOG, "received unsupported stop data notification, ignoring: {}", exception.getMessage() ); 099 } 100 101 current = forwardToNext( readers, iterables, currentValues ); 102 } 103 104 complete( this ); 105 } 106 catch( Throwable throwable ) 107 { 108 if( !( throwable instanceof OutOfMemoryError ) ) 109 LOG.error( "caught throwable", throwable ); 110 111 return throwable; 112 } 113 114 return null; 115 } 116 117 private SortedListMultiMap<Integer, KeyValuesReader> getKeyValuesReaders() throws Exception 118 { 119 SortedListMultiMap<Integer, KeyValuesReader> readers = new SortedListMultiMap<>(); 120 121 for( Map.Entry<Integer, List<LogicalInput>> entry : logicalInputs.getEntries() ) 122 { 123 for( LogicalInput logicalInput : entry.getValue() ) 124 readers.put( entry.getKey(), (KeyValuesReader) logicalInput.getReader() ); 125 } 126 127 return readers; 128 } 129 130 private Map.Entry<Tuple, List<Iterable<Tuple>>> forwardToNext( SortedListMultiMap<Integer, KeyValuesReader> readers, SortedListMultiMap<Tuple, Iterable<Tuple>> iterables, List<Iterable<Tuple>> current ) 131 { 132 try 133 { 134 int size = current == null ? readers.getKeys().size() : current.size(); 135 136 for( int ordinal = 0; ordinal < size; ordinal++ ) 137 { 138 if( current != null && current.get( ordinal ) == null ) 139 continue; 140 141 for( KeyValuesReader reader : readers.getValues( ordinal ) ) 142 { 143 if( !reader.next() ) 144 continue; 145 146 Tuple currentKey = (Tuple) reader.getCurrentKey(); 147 148 if( splice.isSorted() ) 149 currentKey = ( (TuplePair) currentKey ).getLhs(); 150 151 currentKey = getDelegatedTuple( currentKey ); // applies hasher 152 153 Iterable<Tuple> currentValues = (Iterable) reader.getCurrentValues(); 154 155 iterables.set( currentKey, ordinal, currentValues ); 156 } 157 } 158 } 159 catch( OutOfMemoryError error ) 160 { 161 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 162 } 163 catch( CascadingException exception ) 164 { 165 handleException( exception, null ); 166 } 167 catch( Throwable throwable ) 168 { 169 handleException( new DuctException( "internal error", throwable ), null ); 170 } 171 172 return iterables.pollFirstEntry(); 173 } 174 175 private SortedListMultiMap<Tuple, Iterable<Tuple>> getSortedMultiMap( final int length ) 176 { 177 return new SortedListMultiMap<Tuple, Iterable<Tuple>>( getKeyComparator(), length ) 178 { 179 Iterable<Tuple>[] array = new Iterable[ length ]; 180 181 @Override 182 protected List createCollection() 183 { 184 List<Iterable<Tuple>> collection = super.createCollection(); 185 186 Collections.addAll( collection, array ); // init with nulls 187 188 return collection; 189 } 190 }; 191 } 192 193 @Override 194 protected HadoopCoGroupClosure createClosure() 195 { 196 return new TezCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields ); 197 } 198 199 @Override 200 protected Tuple unwrapGrouping( Tuple key ) 201 { 202 return key; 203 } 204 205 }