001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop.util; 022 023import java.io.IOException; 024import java.util.Comparator; 025 026import cascading.CascadingException; 027import cascading.flow.hadoop.util.HadoopUtil; 028import cascading.tuple.Fields; 029import cascading.tuple.StreamComparator; 030import cascading.tuple.Tuple; 031import cascading.tuple.hadoop.TupleSerialization; 032import cascading.tuple.hadoop.io.BufferedInputStream; 033import cascading.tuple.hadoop.io.HadoopTupleInputStream; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.conf.Configured; 036import org.apache.hadoop.io.RawComparator; 037 038/** Class DeserializerComparator is the base class for all Cascading comparator classes. */ 039public abstract class DeserializerComparator<T> extends Configured implements RawComparator<T> 040 { 041 protected BufferedInputStream lhsBuffer; 042 protected BufferedInputStream rhsBuffer; 043 044 protected TupleSerialization tupleSerialization; 045 046 protected HadoopTupleInputStream lhsStream; 047 protected HadoopTupleInputStream rhsStream; 048 049 protected Class[] keyTypes; 050 protected Comparator[] groupComparators; 051 052 boolean hasConfiguredComparators = false; // 053 054 protected boolean canPerformRawComparisons() 055 { 056 return false; 057 } 058 059 protected boolean performRawComparison() 060 { 061 return canPerformRawComparisons() && keyTypes != null && !hasConfiguredComparators; 062 } 063 064 @Override 065 public void setConf( Configuration conf ) 066 { 067 if( conf == null ) 068 return; 069 070 super.setConf( conf ); 071 072 tupleSerialization = new TupleSerialization( conf ); 073 074 keyTypes = tupleSerialization.getKeyTypes(); 075 076 groupComparators = deserializeComparatorsFor( "cascading.group.comparator" ); 077 groupComparators = delegatingComparatorsFor( keyTypes, groupComparators ); 078 079 if( performRawComparison() ) 080 return; 081 082 lhsBuffer = new BufferedInputStream(); 083 rhsBuffer = new BufferedInputStream(); 084 085 // get new readers so deserializers don't compete for the buffer 086 lhsStream = getHadoopTupleInputStream( lhsBuffer, tupleSerialization.getElementReader() ); 087 rhsStream = getHadoopTupleInputStream( rhsBuffer, tupleSerialization.getElementReader() ); 088 } 089 090 protected HadoopTupleInputStream getHadoopTupleInputStream( BufferedInputStream lhsBuffer, TupleSerialization.SerializationElementReader elementReader ) 091 { 092 return new HadoopTupleInputStream( lhsBuffer, elementReader ); 093 } 094 095 Comparator[] deserializeComparatorsFor( String name ) 096 { 097 Configuration conf = getConf(); 098 099 if( conf == null ) 100 throw new IllegalStateException( "no conf set" ); 101 102 return getFieldComparatorsFrom( conf, name ); 103 } 104 105 public static Comparator[] getFieldComparatorsFrom( Configuration conf, String name ) 106 { 107 String value = conf.get( name ); 108 109 if( value == null ) 110 return new Comparator[ conf.getInt( name + ".size", 1 ) ]; 111 112 try 113 { 114 return HadoopUtil.deserializeBase64( value, conf, Fields.class ).getComparators(); 115 } 116 catch( IOException exception ) 117 { 118 throw new CascadingException( "unable to deserialize comparators for: " + name ); 119 } 120 } 121 122 Comparator[] delegatingComparatorsFor( Class[] types, Comparator[] fieldComparators ) 123 { 124 Comparator[] comparators = new Comparator[ fieldComparators.length ]; 125 126 for( int i = 0; i < comparators.length; i++ ) 127 { 128 if( types != null ) 129 { 130 Class type = types[ i ]; 131 132 if( fieldComparators[ i ] != null ) 133 comparators[ i ] = fieldComparators[ i ]; // provided on selector 134 else 135 comparators[ i ] = tupleSerialization.getComparator( type ); // provided via Serialization or conf, may return null 136 137 if( comparators[ i ] != null ) 138 hasConfiguredComparators = true; 139 140 if( comparators[ i ] instanceof StreamComparator ) 141 comparators[ i ] = new TypedTupleElementStreamComparator( type, (StreamComparator) comparators[ i ] ); 142 else 143 comparators[ i ] = new TypedTupleElementComparator( type, comparators[ i ] ); 144 } 145 else 146 { 147 if( fieldComparators[ i ] instanceof StreamComparator ) 148 comparators[ i ] = new TupleElementStreamComparator( (StreamComparator) fieldComparators[ i ] ); 149 else if( fieldComparators[ i ] != null ) 150 comparators[ i ] = new TupleElementComparator( fieldComparators[ i ] ); 151 else 152 comparators[ i ] = new DelegatingTupleElementComparator( tupleSerialization ); // lazy lookup 153 } 154 } 155 156 return comparators; 157 } 158 159 protected final int compareTuples( Comparator[] comparators, Tuple lhs, Tuple rhs ) 160 { 161 int lhsLen = lhs.size(); 162 int rhsLen = rhs.size(); 163 164 int c = lhsLen - rhsLen; 165 166 if( c != 0 ) 167 return c; 168 169 for( int i = 0; i < lhsLen; i++ ) 170 { 171 // hack to support comparators array length of 1 172 Object lhsObject = lhs.getObject( i ); 173 Object rhsObject = rhs.getObject( i ); 174 175 try 176 { 177 c = comparators[ i % comparators.length ].compare( lhsObject, rhsObject ); 178 } 179 catch( Exception exception ) 180 { 181 throw new CascadingException( "unable to compare object elements in position: " + i + " lhs: '" + lhsObject + "' rhs: '" + rhsObject + "'", exception ); 182 } 183 184 if( c != 0 ) 185 return c; 186 } 187 188 return 0; 189 } 190 191 protected final int compareTuples( Class[] types, Comparator[] comparators ) throws IOException 192 { 193 if( types == null ) 194 return compareUnTypedTuples( comparators ); 195 else 196 return compareTypedTuples( types, comparators ); 197 } 198 199 final int compareTypedTuples( Class[] types, Comparator[] comparators ) throws IOException 200 { 201 int c; 202 203 for( int i = 0; i < types.length; i++ ) 204 { 205 try 206 { 207 c = ( (StreamComparator) comparators[ i % comparators.length ] ).compare( lhsStream, rhsStream ); 208 } 209 catch( Exception exception ) 210 { 211 throw new CascadingException( "unable to compare stream elements in position: " + i, exception ); 212 } 213 214 if( c != 0 ) 215 return c; 216 } 217 218 return 0; 219 } 220 221 final int compareUnTypedTuples( Comparator[] comparators ) throws IOException 222 { 223 int lhsLen = lhsStream.getNumElements(); 224 int rhsLen = rhsStream.getNumElements(); 225 226 int c = lhsLen - rhsLen; 227 228 if( c != 0 ) 229 return c; 230 231 for( int i = 0; i < lhsLen; i++ ) 232 { 233 try 234 { 235 c = ( (StreamComparator) comparators[ i % comparators.length ] ).compare( lhsStream, rhsStream ); 236 } 237 catch( Exception exception ) 238 { 239 throw new CascadingException( "unable to compare stream elements in position: " + i, exception ); 240 } 241 242 if( c != 0 ) 243 return c; 244 } 245 246 return 0; 247 } 248 }