001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.util.Comparator; 024 025 import cascading.tuple.Fields; 026 import cascading.tuple.Tuple; 027 028 /** 029 * 030 */ 031 public class SparseTupleComparator implements Comparator<Tuple> 032 { 033 private final static Comparator DEFAULT = new NaturalComparator(); 034 035 private static class NaturalComparator implements Comparator<Object> 036 { 037 @Override 038 public int compare( Object lhs, Object rhs ) 039 { 040 if( lhs == null && rhs == null ) 041 return 0; 042 else if( lhs == null ) 043 return -1; 044 else if( rhs == null ) 045 return 1; 046 else 047 return ( (Comparable) lhs ).compareTo( rhs ); // guaranteed to not be null 048 } 049 } 050 051 final Comparator[] comparators; 052 final int[] posMap; 053 054 public SparseTupleComparator( Fields valuesField, Fields sortFields ) 055 { 056 this( valuesField, sortFields, null ); 057 } 058 059 public SparseTupleComparator( Fields groupFields, Comparator defaultComparator ) 060 { 061 this( groupFields, groupFields, defaultComparator ); 062 } 063 064 public SparseTupleComparator( Fields valuesFields, Fields sortFields, Comparator defaultComparator ) 065 { 066 if( defaultComparator == null ) 067 defaultComparator = DEFAULT; 068 069 int size = valuesFields != null && !valuesFields.isUnknown() ? valuesFields.size() : sortFields.size(); 070 comparators = new Comparator[ size ]; 071 posMap = new int[ size ]; 072 073 Comparator[] sortFieldComparators = sortFields.getComparators(); // returns a copy 074 075 for( int i = 0; i < sortFields.size(); i++ ) 076 { 077 Comparable field = sortFields.get( i ); 078 int pos = valuesFields != null ? valuesFields.getPos( field ) : i; 079 080 comparators[ i ] = sortFieldComparators[ i ]; 081 posMap[ i ] = pos; 082 083 if( comparators[ i ] == null ) 084 comparators[ i ] = defaultComparator; 085 } 086 } 087 088 public Comparator[] getComparators() 089 { 090 return comparators; 091 } 092 093 @Override 094 public int compare( Tuple lhs, Tuple rhs ) 095 { 096 for( int i = 0; i < comparators.length; i++ ) 097 { 098 Comparator comparator = comparators[ i ]; 099 100 if( comparator == null ) 101 continue; 102 103 int pos = posMap[ i ]; 104 int c = comparator.compare( lhs.getObject( pos ), rhs.getObject( pos ) ); 105 106 if( c != 0 ) 107 return c; 108 } 109 110 return 0; 111 } 112 }