001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple.hadoop.util;
022
023import java.io.IOException;
024import java.util.Comparator;
025
026import cascading.CascadingException;
027import cascading.flow.hadoop.util.HadoopUtil;
028import cascading.tuple.Fields;
029import cascading.tuple.StreamComparator;
030import cascading.tuple.Tuple;
031import cascading.tuple.hadoop.TupleSerialization;
032import cascading.tuple.hadoop.io.BufferedInputStream;
033import cascading.tuple.hadoop.io.HadoopTupleInputStream;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.io.RawComparator;
037
038/** Class DeserializerComparator is the base class for all Cascading comparator classes. */
039public abstract class DeserializerComparator<T> extends Configured implements RawComparator<T>
040  {
041  final BufferedInputStream lhsBuffer = new BufferedInputStream();
042  final BufferedInputStream rhsBuffer = new BufferedInputStream();
043
044  TupleSerialization tupleSerialization;
045
046  HadoopTupleInputStream lhsStream;
047  HadoopTupleInputStream rhsStream;
048
049  Comparator[] groupComparators;
050
051  @Override
052  public void setConf( Configuration conf )
053    {
054    if( conf == null )
055      return;
056
057    super.setConf( conf );
058
059    tupleSerialization = new TupleSerialization( conf );
060
061    // get new readers so deserializers don't compete for the buffer
062    lhsStream = new HadoopTupleInputStream( lhsBuffer, tupleSerialization.getElementReader() );
063    rhsStream = new HadoopTupleInputStream( rhsBuffer, tupleSerialization.getElementReader() );
064
065    groupComparators = deserializeComparatorsFor( "cascading.group.comparator" );
066    groupComparators = delegatingComparatorsFor( groupComparators );
067    }
068
069  Comparator[] deserializeComparatorsFor( String name )
070    {
071    Configuration conf = getConf();
072
073    if( conf == null )
074      throw new IllegalStateException( "no conf set" );
075
076    return getFieldComparatorsFrom( conf, name );
077    }
078
079  public static Comparator[] getFieldComparatorsFrom( Configuration conf, String name )
080    {
081    String value = conf.get( name );
082
083    if( value == null )
084      return new Comparator[ conf.getInt( name + ".size", 1 ) ];
085
086    try
087      {
088      return HadoopUtil.deserializeBase64( value, conf, Fields.class ).getComparators();
089      }
090    catch( IOException exception )
091      {
092      throw new CascadingException( "unable to deserialize comparators for: " + name );
093      }
094    }
095
096  Comparator[] delegatingComparatorsFor( Comparator[] fieldComparators )
097    {
098    Comparator[] comparators = new Comparator[ fieldComparators.length ];
099
100    for( int i = 0; i < comparators.length; i++ )
101      {
102      if( fieldComparators[ i ] instanceof StreamComparator )
103        comparators[ i ] = new TupleElementStreamComparator( (StreamComparator) fieldComparators[ i ] );
104      else if( fieldComparators[ i ] != null )
105        comparators[ i ] = new TupleElementComparator( fieldComparators[ i ] );
106      else
107        comparators[ i ] = new DelegatingTupleElementComparator( tupleSerialization );
108      }
109
110    return comparators;
111    }
112
113  final int compareTuples( Comparator[] comparators, Tuple lhs, Tuple rhs )
114    {
115    int lhsLen = lhs.size();
116    int rhsLen = rhs.size();
117
118    int c = lhsLen - rhsLen;
119
120    if( c != 0 )
121      return c;
122
123    for( int i = 0; i < lhsLen; i++ )
124      {
125      // hack to support comparators array length of 1
126      Object lhsObject = lhs.getObject( i );
127      Object rhsObject = rhs.getObject( i );
128
129      try
130        {
131        c = comparators[ i % comparators.length ].compare( lhsObject, rhsObject );
132        }
133      catch( Exception exception )
134        {
135        throw new CascadingException( "unable to compare object elements in position: " + i + " lhs: '" + lhsObject + "' rhs: '" + rhsObject + "'", exception );
136        }
137
138      if( c != 0 )
139        return c;
140      }
141
142    return 0;
143    }
144
145  final int compareTuples( Comparator[] comparators ) throws IOException
146    {
147    int lhsLen = lhsStream.getNumElements();
148    int rhsLen = rhsStream.getNumElements();
149
150    int c = lhsLen - rhsLen;
151
152    if( c != 0 )
153      return c;
154
155    for( int i = 0; i < lhsLen; i++ )
156      {
157      // hack to support comparators array length of 1
158      try
159        {
160        c = ( (StreamComparator) comparators[ i % comparators.length ] ).compare( lhsStream, rhsStream );
161        }
162      catch( Exception exception )
163        {
164        throw new CascadingException( "unable to compare stream elements in position: " + i, exception );
165        }
166
167      if( c != 0 )
168        return c;
169      }
170
171    return 0;
172    }
173  }