001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.util;
022
023import java.io.IOException;
024import java.util.Comparator;
025
026import cascading.CascadingException;
027import cascading.flow.hadoop.util.HadoopUtil;
028import cascading.tuple.Fields;
029import cascading.tuple.StreamComparator;
030import cascading.tuple.Tuple;
031import cascading.tuple.hadoop.TupleSerialization;
032import cascading.tuple.hadoop.io.BufferedInputStream;
033import cascading.tuple.hadoop.io.HadoopTupleInputStream;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.io.RawComparator;
037
038/** Class DeserializerComparator is the base class for all Cascading comparator classes. */
039public abstract class DeserializerComparator<T> extends Configured implements RawComparator<T>
040  {
041  protected BufferedInputStream lhsBuffer;
042  protected BufferedInputStream rhsBuffer;
043
044  protected TupleSerialization tupleSerialization;
045
046  protected HadoopTupleInputStream lhsStream;
047  protected HadoopTupleInputStream rhsStream;
048
049  protected Class[] keyTypes;
050  protected Comparator[] groupComparators;
051
052  boolean hasConfiguredComparators = false; //
053
054  protected boolean canPerformRawComparisons()
055    {
056    return false;
057    }
058
059  protected boolean performRawComparison()
060    {
061    return canPerformRawComparisons() && keyTypes != null && !hasConfiguredComparators;
062    }
063
064  @Override
065  public void setConf( Configuration conf )
066    {
067    if( conf == null )
068      return;
069
070    super.setConf( conf );
071
072    tupleSerialization = new TupleSerialization( conf );
073
074    keyTypes = tupleSerialization.getKeyTypes();
075
076    groupComparators = deserializeComparatorsFor( "cascading.group.comparator" );
077    groupComparators = delegatingComparatorsFor( keyTypes, groupComparators );
078
079    if( performRawComparison() )
080      return;
081
082    lhsBuffer = new BufferedInputStream();
083    rhsBuffer = new BufferedInputStream();
084
085    // get new readers so deserializers don't compete for the buffer
086    lhsStream = getHadoopTupleInputStream( lhsBuffer, tupleSerialization.getElementReader() );
087    rhsStream = getHadoopTupleInputStream( rhsBuffer, tupleSerialization.getElementReader() );
088    }
089
090  protected HadoopTupleInputStream getHadoopTupleInputStream( BufferedInputStream lhsBuffer, TupleSerialization.SerializationElementReader elementReader )
091    {
092    return new HadoopTupleInputStream( lhsBuffer, elementReader );
093    }
094
095  Comparator[] deserializeComparatorsFor( String name )
096    {
097    Configuration conf = getConf();
098
099    if( conf == null )
100      throw new IllegalStateException( "no conf set" );
101
102    return getFieldComparatorsFrom( conf, name );
103    }
104
105  public static Comparator[] getFieldComparatorsFrom( Configuration conf, String name )
106    {
107    String value = conf.get( name );
108
109    if( value == null )
110      return new Comparator[ conf.getInt( name + ".size", 1 ) ];
111
112    try
113      {
114      return HadoopUtil.deserializeBase64( value, conf, Fields.class ).getComparators();
115      }
116    catch( IOException exception )
117      {
118      throw new CascadingException( "unable to deserialize comparators for: " + name );
119      }
120    }
121
122  Comparator[] delegatingComparatorsFor( Class[] types, Comparator[] fieldComparators )
123    {
124    Comparator[] comparators = new Comparator[ fieldComparators.length ];
125
126    for( int i = 0; i < comparators.length; i++ )
127      {
128      if( types != null )
129        {
130        Class type = types[ i ];
131
132        if( fieldComparators[ i ] != null )
133          comparators[ i ] = fieldComparators[ i ]; // provided on selector
134        else
135          comparators[ i ] = tupleSerialization.getComparator( type ); // provided via Serialization or conf, may return null
136
137        if( comparators[ i ] != null )
138          hasConfiguredComparators = true;
139
140        if( comparators[ i ] instanceof StreamComparator )
141          comparators[ i ] = new TypedTupleElementStreamComparator( type, (StreamComparator) comparators[ i ] );
142        else
143          comparators[ i ] = new TypedTupleElementComparator( type, comparators[ i ] );
144        }
145      else
146        {
147        if( fieldComparators[ i ] instanceof StreamComparator )
148          comparators[ i ] = new TupleElementStreamComparator( (StreamComparator) fieldComparators[ i ] );
149        else if( fieldComparators[ i ] != null )
150          comparators[ i ] = new TupleElementComparator( fieldComparators[ i ] );
151        else
152          comparators[ i ] = new DelegatingTupleElementComparator( tupleSerialization ); // lazy lookup
153        }
154      }
155
156    return comparators;
157    }
158
159  protected final int compareTuples( Comparator[] comparators, Tuple lhs, Tuple rhs )
160    {
161    int lhsLen = lhs.size();
162    int rhsLen = rhs.size();
163
164    int c = lhsLen - rhsLen;
165
166    if( c != 0 )
167      return c;
168
169    for( int i = 0; i < lhsLen; i++ )
170      {
171      // hack to support comparators array length of 1
172      Object lhsObject = lhs.getObject( i );
173      Object rhsObject = rhs.getObject( i );
174
175      try
176        {
177        c = comparators[ i % comparators.length ].compare( lhsObject, rhsObject );
178        }
179      catch( Exception exception )
180        {
181        throw new CascadingException( "unable to compare object elements in position: " + i + " lhs: '" + lhsObject + "' rhs: '" + rhsObject + "'", exception );
182        }
183
184      if( c != 0 )
185        return c;
186      }
187
188    return 0;
189    }
190
191  protected final int compareTuples( Class[] types, Comparator[] comparators ) throws IOException
192    {
193    if( types == null )
194      return compareUnTypedTuples( comparators );
195    else
196      return compareTypedTuples( types, comparators );
197    }
198
199  final int compareTypedTuples( Class[] types, Comparator[] comparators ) throws IOException
200    {
201    int c;
202
203    for( int i = 0; i < types.length; i++ )
204      {
205      try
206        {
207        c = ( (StreamComparator) comparators[ i % comparators.length ] ).compare( lhsStream, rhsStream );
208        }
209      catch( Exception exception )
210        {
211        throw new CascadingException( "unable to compare stream elements in position: " + i, exception );
212        }
213
214      if( c != 0 )
215        return c;
216      }
217
218    return 0;
219    }
220
221  final int compareUnTypedTuples( Comparator[] comparators ) throws IOException
222    {
223    int lhsLen = lhsStream.getNumElements();
224    int rhsLen = rhsStream.getNumElements();
225
226    int c = lhsLen - rhsLen;
227
228    if( c != 0 )
229      return c;
230
231    for( int i = 0; i < lhsLen; i++ )
232      {
233      try
234        {
235        c = ( (StreamComparator) comparators[ i % comparators.length ] ).compare( lhsStream, rhsStream );
236        }
237      catch( Exception exception )
238        {
239        throw new CascadingException( "unable to compare stream elements in position: " + i, exception );
240        }
241
242      if( c != 0 )
243        return c;
244      }
245
246    return 0;
247    }
248  }