001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.tez.stream.element; 023 024import cascading.flow.FlowProcess; 025import cascading.flow.SliceCounters; 026import cascading.flow.hadoop.HadoopGroupByClosure; 027import cascading.flow.hadoop.util.TimedIterator; 028import cascading.flow.stream.StopDataNotificationException; 029import cascading.flow.stream.graph.IORole; 030import cascading.flow.tez.TezGroupByClosure; 031import cascading.flow.tez.util.SecondarySortKeyValuesReader; 032import cascading.pipe.GroupBy; 033import cascading.tuple.Tuple; 034import cascading.tuple.io.TuplePair; 035import cascading.util.LogUtil; 036import cascading.util.SortedListMultiMap; 037import cascading.util.Util; 038import org.apache.tez.runtime.api.LogicalInput; 039import org.apache.tez.runtime.api.LogicalOutput; 040import org.apache.tez.runtime.library.api.KeyValuesReader; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * 046 */ 047public class TezGroupByGate extends TezGroupGate 048 { 049 private static final Logger LOG = LoggerFactory.getLogger( TezGroupByGate.class ); 050 051 protected TimedIterator[] timedIterators; 052 053 public TezGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role, LogicalOutput logicalOutput ) 054 { 055 super( flowProcess, groupBy, role, logicalOutput ); 056 } 057 058 public TezGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 059 { 060 super( flowProcess, groupBy, role, logicalInputs ); 061 062 this.timedIterators = TimedIterator.iterators( new TimedIterator<>( flowProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read ) ); 063 } 064 065 protected Throwable reduce() throws Exception 066 { 067 try 068 { 069 start( this ); 070 071 // if multiple ordinals, an input could be duplicated if sourcing multiple paths 072 LogicalInput logicalInput = Util.getFirst( logicalInputs.getValues() ); 073 074 KeyValuesReader reader = (KeyValuesReader) logicalInput.getReader(); 075 076 if( sortFields != null ) 077 reader = new SecondarySortKeyValuesReader( reader, groupComparators[ 0 ] ); 078 079 while( reader.next() ) 080 { 081 Tuple currentKey = (Tuple) reader.getCurrentKey(); // if secondary sorting, is a TuplePair 082 Iterable currentValues = reader.getCurrentValues(); 083 084 timedIterators[ 0 ].reset( currentValues ); 085 086 try 087 { 088 accept( currentKey, timedIterators ); // will unwrap the TuplePair 089 } 090 catch( StopDataNotificationException exception ) 091 { 092 LogUtil.logWarnOnce( LOG, "received unsupported stop data notification, ignoring: {}", exception.getMessage() ); 093 } 094 } 095 096 complete( this ); 097 } 098 catch( Throwable throwable ) 099 { 100 if( !( throwable instanceof OutOfMemoryError ) ) 101 LOG.error( "caught throwable", throwable ); 102 103 return throwable; 104 } 105 106 return null; 107 } 108 109 @Override 110 protected HadoopGroupByClosure createClosure() 111 { 112 return new TezGroupByClosure( flowProcess, keyFields, valuesFields ); 113 } 114 115 @Override 116 protected Tuple unwrapGrouping( Tuple key ) 117 { 118 // copying the lhs key during secondary sorting prevents the key from advancing at the end of the 119 // aggregation iterator 120 return sortFields == null ? key : new Tuple( ( (TuplePair) key ).getLhs() ); 121 } 122 }