001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.element; 022 023import cascading.flow.FlowProcess; 024import cascading.flow.SliceCounters; 025import cascading.flow.hadoop.HadoopGroupByClosure; 026import cascading.flow.hadoop.util.TimedIterator; 027import cascading.flow.stream.graph.IORole; 028import cascading.flow.tez.TezGroupByClosure; 029import cascading.flow.tez.util.SecondarySortKeyValuesReader; 030import cascading.pipe.GroupBy; 031import cascading.tuple.Tuple; 032import cascading.tuple.io.TuplePair; 033import cascading.util.SortedListMultiMap; 034import cascading.util.Util; 035import org.apache.tez.runtime.api.LogicalInput; 036import org.apache.tez.runtime.api.LogicalOutput; 037import org.apache.tez.runtime.library.api.KeyValuesReader; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * 043 */ 044public class TezGroupByGate extends TezGroupGate 045 { 046 private static final Logger LOG = LoggerFactory.getLogger( TezGroupByGate.class ); 047 048 protected TimedIterator[] timedIterators; 049 050 public TezGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role, LogicalOutput logicalOutput ) 051 { 052 super( flowProcess, groupBy, role, logicalOutput ); 053 } 054 055 public TezGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 056 { 057 super( flowProcess, groupBy, role, logicalInputs ); 058 059 this.timedIterators = TimedIterator.iterators( new TimedIterator<>( flowProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read ) ); 060 } 061 062 protected Throwable reduce() throws Exception 063 { 064 try 065 { 066 start( this ); 067 068 // if multiple ordinals, an input could be duplicated if sourcing multiple paths 069 LogicalInput logicalInput = Util.getFirst( logicalInputs.getValues() ); 070 071 KeyValuesReader reader = (KeyValuesReader) logicalInput.getReader(); 072 073 if( sortFields != null ) 074 reader = new SecondarySortKeyValuesReader( reader, groupComparators[ 0 ] ); 075 076 while( reader.next() ) 077 { 078 Tuple currentKey = (Tuple) reader.getCurrentKey(); // if secondary sorting, is a TuplePair 079 Iterable currentValues = reader.getCurrentValues(); 080 081 timedIterators[ 0 ].reset( currentValues ); 082 083 accept( currentKey, timedIterators ); // will unwrap the TuplePair 084 } 085 086 complete( this ); 087 } 088 catch( Throwable throwable ) 089 { 090 if( !( throwable instanceof OutOfMemoryError ) ) 091 LOG.error( "caught throwable", throwable ); 092 093 return throwable; 094 } 095 096 return null; 097 } 098 099 @Override 100 protected HadoopGroupByClosure createClosure() 101 { 102 return new TezGroupByClosure( flowProcess, keyFields, valuesFields ); 103 } 104 105 @Override 106 protected Tuple unwrapGrouping( Tuple key ) 107 { 108 // copying the lhs key during secondary sorting prevents the key from advancing at the end of the 109 // aggregation iterator 110 return sortFields == null ? key : new Tuple( ( (TuplePair) key ).getLhs() ); 111 } 112 }