001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.util; 022 023import java.io.IOException; 024import java.util.Comparator; 025import java.util.Iterator; 026 027import cascading.CascadingException; 028import cascading.tuple.Tuple; 029import cascading.tuple.io.TuplePair; 030import org.apache.tez.runtime.library.api.KeyValuesReader; 031 032/** 033 * 034 */ 035public class SecondarySortKeyValuesReader extends KeyValuesReader 036 { 037 private KeyValuesReader parent; 038 private Comparator<Tuple> groupComparator; 039 private Tuple currentKey; 040 private Iterable<Object> currentValues; 041 private boolean isNewKey = false; 042 private TuplePair currentKeyPair; 043 044 public SecondarySortKeyValuesReader( KeyValuesReader parent, Comparator<Tuple> groupComparator ) 045 { 046 this.parent = parent; 047 this.groupComparator = groupComparator; 048 } 049 050 @Override 051 public boolean next() throws IOException 052 { 053 // next for forwarding to the next key 054 // so we must keep iterating across keys in the parent until we find a new 055 // grouping key without its accompanied sort values 056 057 if( parent != null && isNewKey ) 058 { 059 isNewKey = false; // allow next next() to advance underlying iterator 060 return true; 061 } 062 063 boolean advanced = advance(); 064 065 while( !isNewKey && advanced ) 066 advanced = advance(); 067 068 isNewKey = false; 069 070 return advanced; 071 } 072 073 protected boolean advance() throws IOException 074 { 075 if( parent == null ) 076 return false; 077 078 boolean next = parent.next(); 079 080 if( !next ) 081 { 082 parent = null; 083 return false; 084 } 085 086 currentKeyPair = (TuplePair) parent.getCurrentKey(); 087 088 isNewKey = currentKey == null || groupComparator.compare( currentKey, currentKeyPair.getLhs() ) != 0; 089 currentKey = currentKeyPair.getLhs(); 090 currentValues = parent.getCurrentValues(); 091 092 return true; 093 } 094 095 @Override 096 public Object getCurrentKey() throws IOException 097 { 098 return currentKeyPair; 099 } 100 101 @Override 102 public Iterable<Object> getCurrentValues() throws IOException 103 { 104 return new Iterable<Object>() 105 { 106 @Override 107 public Iterator<Object> iterator() 108 { 109 final Iterator<Object>[] iterator = new Iterator[]{currentValues.iterator()}; 110 111 return new Iterator<Object>() 112 { 113 @Override 114 public boolean hasNext() 115 { 116 boolean hasNext = iterator[ 0 ].hasNext(); 117 118 if( hasNext ) 119 return true; 120 121 if( !advanceSafe() ) 122 return false; 123 124 if( isNewKey ) 125 return false; 126 127 iterator[ 0 ] = currentValues.iterator(); 128 129 return hasNext(); 130 } 131 132 @Override 133 public Object next() 134 { 135 return iterator[ 0 ].next(); 136 } 137 138 @Override 139 public void remove() 140 { 141 iterator[ 0 ].remove(); 142 } 143 144 protected boolean advanceSafe() 145 { 146 try 147 { 148 return advance(); 149 } 150 catch( IOException exception ) 151 { 152 throw new CascadingException( "unable to advance values iterator", exception ); 153 } 154 } 155 }; 156 } 157 }; 158 } 159 }