001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.Comparator;
024    
025    import cascading.tuple.Fields;
026    import cascading.tuple.Tuple;
027    
028    /**
029     *
030     */
031    public class SparseTupleComparator implements Comparator<Tuple>
032      {
033      private final static Comparator DEFAULT = new NaturalComparator();
034    
035      private static class NaturalComparator implements Comparator<Object>
036        {
037        @Override
038        public int compare( Object lhs, Object rhs )
039          {
040          if( lhs == null && rhs == null )
041            return 0;
042          else if( lhs == null )
043            return -1;
044          else if( rhs == null )
045            return 1;
046          else
047            return ( (Comparable) lhs ).compareTo( rhs ); // guaranteed to not be null
048          }
049        }
050    
051      final Comparator[] comparators;
052      final int[] posMap;
053    
054      public SparseTupleComparator( Fields valuesField, Fields sortFields )
055        {
056        this( valuesField, sortFields, null );
057        }
058    
059      public SparseTupleComparator( Fields groupFields, Comparator defaultComparator )
060        {
061        this( groupFields, groupFields, defaultComparator );
062        }
063    
064      public SparseTupleComparator( Fields valuesFields, Fields sortFields, Comparator defaultComparator )
065        {
066        if( defaultComparator == null )
067          defaultComparator = DEFAULT;
068    
069        int size = valuesFields != null && !valuesFields.isUnknown() ? valuesFields.size() : sortFields.size();
070        comparators = new Comparator[ size ];
071        posMap = new int[ size ];
072    
073        Comparator[] sortFieldComparators = sortFields.getComparators(); // returns a copy
074    
075        for( int i = 0; i < sortFields.size(); i++ )
076          {
077          Comparable field = sortFields.get( i );
078          int pos = valuesFields != null ? valuesFields.getPos( field ) : i;
079    
080          comparators[ i ] = sortFieldComparators[ i ];
081          posMap[ i ] = pos;
082    
083          if( comparators[ i ] == null )
084            comparators[ i ] = defaultComparator;
085          }
086        }
087    
088      public Comparator[] getComparators()
089        {
090        return comparators;
091        }
092    
093      @Override
094      public int compare( Tuple lhs, Tuple rhs )
095        {
096        for( int i = 0; i < comparators.length; i++ )
097          {
098          Comparator comparator = comparators[ i ];
099    
100          if( comparator == null )
101            continue;
102    
103          int pos = posMap[ i ];
104          int c = comparator.compare( lhs.getObject( pos ), rhs.getObject( pos ) );
105    
106          if( c != 0 )
107            return c;
108          }
109    
110        return 0;
111        }
112      }