001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop.util;
022    
023    import java.io.IOException;
024    import java.util.Comparator;
025    
026    import cascading.CascadingException;
027    import cascading.flow.hadoop.util.HadoopUtil;
028    import cascading.tuple.Fields;
029    import cascading.tuple.StreamComparator;
030    import cascading.tuple.Tuple;
031    import cascading.tuple.hadoop.TupleSerialization;
032    import cascading.tuple.hadoop.io.BufferedInputStream;
033    import cascading.tuple.hadoop.io.HadoopTupleInputStream;
034    import org.apache.hadoop.conf.Configuration;
035    import org.apache.hadoop.conf.Configured;
036    import org.apache.hadoop.io.RawComparator;
037    
038    /** Class DeserializerComparator is the base class for all Cascading comparator classes. */
039    public abstract class DeserializerComparator<T> extends Configured implements RawComparator<T>
040      {
041      final BufferedInputStream lhsBuffer = new BufferedInputStream();
042      final BufferedInputStream rhsBuffer = new BufferedInputStream();
043    
044      TupleSerialization tupleSerialization;
045    
046      HadoopTupleInputStream lhsStream;
047      HadoopTupleInputStream rhsStream;
048    
049      Comparator[] groupComparators;
050    
051      @Override
052      public void setConf( Configuration conf )
053        {
054        if( conf == null )
055          return;
056    
057        super.setConf( conf );
058    
059        tupleSerialization = new TupleSerialization( conf );
060    
061        // get new readers so deserializers don't compete for the buffer
062        lhsStream = new HadoopTupleInputStream( lhsBuffer, tupleSerialization.getElementReader() );
063        rhsStream = new HadoopTupleInputStream( rhsBuffer, tupleSerialization.getElementReader() );
064    
065        groupComparators = deserializeComparatorsFor( "cascading.group.comparator" );
066        groupComparators = delegatingComparatorsFor( groupComparators );
067        }
068    
069      Comparator[] deserializeComparatorsFor( String name )
070        {
071        Configuration conf = getConf();
072    
073        if( conf == null )
074          throw new IllegalStateException( "no conf set" );
075    
076        return getFieldComparatorsFrom( conf, name );
077        }
078    
079      public static Comparator[] getFieldComparatorsFrom( Configuration conf, String name )
080        {
081        String value = conf.get( name );
082    
083        if( value == null )
084          return new Comparator[ conf.getInt( name + ".size", 1 ) ];
085    
086        try
087          {
088          return HadoopUtil.deserializeBase64( value, conf, Fields.class ).getComparators();
089          }
090        catch( IOException exception )
091          {
092          throw new CascadingException( "unable to deserialize comparators for: " + name );
093          }
094        }
095    
096      Comparator[] delegatingComparatorsFor( Comparator[] fieldComparators )
097        {
098        Comparator[] comparators = new Comparator[ fieldComparators.length ];
099    
100        for( int i = 0; i < comparators.length; i++ )
101          {
102          if( fieldComparators[ i ] instanceof StreamComparator )
103            comparators[ i ] = new TupleElementStreamComparator( (StreamComparator) fieldComparators[ i ] );
104          else if( fieldComparators[ i ] != null )
105            comparators[ i ] = new TupleElementComparator( fieldComparators[ i ] );
106          else
107            comparators[ i ] = new DelegatingTupleElementComparator( tupleSerialization );
108          }
109    
110        return comparators;
111        }
112    
113      final int compareTuples( Comparator[] comparators, Tuple lhs, Tuple rhs )
114        {
115        int lhsLen = lhs.size();
116        int rhsLen = rhs.size();
117    
118        int c = lhsLen - rhsLen;
119    
120        if( c != 0 )
121          return c;
122    
123        for( int i = 0; i < lhsLen; i++ )
124          {
125          // hack to support comparators array length of 1
126          Object lhsObject = lhs.getObject( i );
127          Object rhsObject = rhs.getObject( i );
128    
129          try
130            {
131            c = comparators[ i % comparators.length ].compare( lhsObject, rhsObject );
132            }
133          catch( Exception exception )
134            {
135            throw new CascadingException( "unable to compare object elements in position: " + i + " lhs: '" + lhsObject + "' rhs: '" + rhsObject + "'", exception );
136            }
137    
138          if( c != 0 )
139            return c;
140          }
141    
142        return 0;
143        }
144    
145      final int compareTuples( Comparator[] comparators ) throws IOException
146        {
147        int lhsLen = lhsStream.getNumElements();
148        int rhsLen = rhsStream.getNumElements();
149    
150        int c = lhsLen - rhsLen;
151    
152        if( c != 0 )
153          return c;
154    
155        for( int i = 0; i < lhsLen; i++ )
156          {
157          // hack to support comparators array length of 1
158          try
159            {
160            c = ( (StreamComparator) comparators[ i % comparators.length ] ).compare( lhsStream, rhsStream );
161            }
162          catch( Exception exception )
163            {
164            throw new CascadingException( "unable to compare stream elements in position: " + i, exception );
165            }
166    
167          if( c != 0 )
168            return c;
169          }
170    
171        return 0;
172        }
173      }