001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.hadoop.util; 022 023 import java.io.IOException; 024 import java.util.Comparator; 025 026 import cascading.CascadingException; 027 import cascading.tuple.StreamComparator; 028 import cascading.tuple.hadoop.TupleSerialization; 029 import cascading.tuple.io.TupleInputStream; 030 031 /** 032 * 033 */ 034 public class DelegatingTupleElementComparator implements StreamComparator<TupleInputStream>, Comparator<Object> 035 { 036 final TupleSerialization tupleSerialization; 037 Comparator<Object> objectComparator = null; 038 StreamComparator<TupleInputStream> streamComparator = null; 039 040 public DelegatingTupleElementComparator( TupleSerialization tupleSerialization ) 041 { 042 this.tupleSerialization = tupleSerialization; 043 } 044 045 @Override 046 public int compare( Object lhs, Object rhs ) 047 { 048 if( objectComparator == null ) 049 { 050 if( lhs == null && rhs == null ) 051 return 0; 052 053 objectComparator = getComparator( lhs, rhs ); 054 } 055 056 return objectComparator.compare( lhs, rhs ); 057 } 058 059 private Comparator<Object> getComparator( Object lhs, Object rhs ) 060 { 061 Class type = lhs != null ? lhs.getClass() : null; 062 063 type = type == null && rhs != null ? rhs.getClass() : type; 064 065 return new TupleElementComparator( tupleSerialization.getComparator( type ) ); 066 } 067 068 @Override 069 public int compare( TupleInputStream lhsStream, TupleInputStream rhsStream ) 070 { 071 if( streamComparator == null ) 072 streamComparator = getComparator( lhsStream ); 073 074 return streamComparator.compare( lhsStream, rhsStream ); 075 } 076 077 private StreamComparator getComparator( TupleInputStream lhsStream ) 078 { 079 try 080 { 081 lhsStream.mark( 4 * 1024 ); 082 083 Comparator foundComparator = lhsStream.getComparatorFor( lhsStream.readToken() ); 084 085 // grab the configured default comparator, its ok if its null, just wasn't configured externally 086 if( foundComparator == null ) 087 foundComparator = tupleSerialization.getDefaultComparator(); 088 089 if( foundComparator instanceof StreamComparator ) 090 return new TupleElementStreamComparator( (StreamComparator) foundComparator ); 091 else 092 return new TupleElementComparator( foundComparator ); 093 } 094 catch( IOException exception ) 095 { 096 throw new CascadingException( exception ); 097 } 098 finally 099 { 100 try 101 { 102 lhsStream.reset(); 103 } 104 catch( IOException exception ) 105 { 106 throw new CascadingException( exception ); 107 } 108 } 109 } 110 111 }