001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.pipe.joiner; 023 024import java.beans.ConstructorProperties; 025import java.util.Arrays; 026import java.util.Iterator; 027 028import cascading.tuple.Fields; 029import cascading.tuple.Tuple; 030import cascading.tuple.Tuples; 031import cascading.tuple.util.TupleViews; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Class InnerJoin will return an {@link Iterator} that will iterate over a given {@link Joiner} and return tuples that represent 037 * and inner join of the CoGrouper internal grouped tuple collections. 038 * <p> 039 * Joins perform based on the equality of the join keys. In the case of null values, Java treats two 040 * null values as equivalent. SQL does not treat null values as equal. To produce SQL like results in a given 041 * join, a new {@link java.util.Comparator} will need to be used on the joined values to prevent null from 042 * equaling null. As a convenience, see the {@link cascading.util.NullNotEquivalentComparator} class. 043 */ 044public class InnerJoin extends BaseJoiner 045 { 046 /** Field LOG */ 047 private static final Logger LOG = LoggerFactory.getLogger( InnerJoin.class ); 048 049 public InnerJoin() 050 { 051 } 052 053 @ConstructorProperties({"fieldDeclaration"}) 054 public InnerJoin( Fields fieldDeclaration ) 055 { 056 super( fieldDeclaration ); 057 } 058 059 public Iterator<Tuple> getIterator( JoinerClosure closure ) 060 { 061 return new JoinIterator( closure ); 062 } 063 064 public int numJoins() 065 { 066 return -1; 067 } 068 069 public static class JoinIterator implements Iterator<Tuple> 070 { 071 final JoinerClosure closure; 072 Iterator[] iterators; 073 Tuple[] lastValues; 074 075 TupleBuilder resultBuilder; 076 Tuple result = new Tuple(); // will be replaced 077 078 public JoinIterator( JoinerClosure closure ) 079 { 080 this.closure = closure; 081 082 LOG.debug( "cogrouped size: {}", closure.size() ); 083 084 init(); 085 } 086 087 protected void init() 088 { 089 iterators = new Iterator[ closure.size() ]; 090 091 for( int i = 0; i < closure.size(); i++ ) 092 iterators[ i ] = getIterator( i ); 093 094 boolean isUnknown = false; 095 096 for( Fields fields : closure.getValueFields() ) 097 isUnknown |= fields.isUnknown(); 098 099 if( isUnknown ) 100 resultBuilder = new TupleBuilder() 101 { 102 Tuple result = new Tuple(); // is re-used 103 104 @Override 105 public Tuple makeResult( Tuple[] tuples ) 106 { 107 result.clear(); 108 109 // flatten the results into one Tuple 110 for( Tuple lastValue : tuples ) 111 result.addAll( lastValue ); 112 113 return result; 114 } 115 }; 116 else 117 resultBuilder = new TupleBuilder() 118 { 119 Tuple result; 120 121 { 122 // handle self join. 123 Fields[] fields = closure.getValueFields(); 124 125 if( closure.isSelfJoin() ) 126 { 127 fields = new Fields[ closure.size() ]; 128 129 Arrays.fill( fields, closure.getValueFields()[ 0 ] ); 130 } 131 132 result = TupleViews.createComposite( fields ); 133 } 134 135 @Override 136 public Tuple makeResult( Tuple[] tuples ) 137 { 138 return TupleViews.reset( result, tuples ); 139 } 140 }; 141 } 142 143 protected Iterator getIterator( int i ) 144 { 145 return closure.getIterator( i ); 146 } 147 148 private Tuple[] initLastValues() 149 { 150 lastValues = new Tuple[ iterators.length ]; 151 152 for( int i = 0; i < iterators.length; i++ ) 153 lastValues[ i ] = (Tuple) iterators[ i ].next(); 154 155 return lastValues; 156 } 157 158 public final boolean hasNext() 159 { 160 // if this is the first pass, and there is an iterator without a next value, 161 // then we have no next element 162 if( lastValues == null ) 163 { 164 for( Iterator iterator : iterators ) 165 { 166 if( !iterator.hasNext() ) 167 return false; 168 } 169 170 return true; 171 } 172 173 for( Iterator iterator : iterators ) 174 { 175 if( iterator.hasNext() ) 176 return true; 177 } 178 179 return false; 180 } 181 182 public Tuple next() 183 { 184 if( lastValues == null ) 185 return makeResult( initLastValues() ); 186 187 for( int i = iterators.length - 1; i >= 0; i-- ) 188 { 189 if( iterators[ i ].hasNext() ) 190 { 191 lastValues[ i ] = (Tuple) iterators[ i ].next(); 192 break; 193 } 194 195 // reset to first 196 iterators[ i ] = getIterator( i ); 197 lastValues[ i ] = (Tuple) iterators[ i ].next(); 198 } 199 200 return makeResult( lastValues ); 201 } 202 203 private Tuple makeResult( Tuple[] lastValues ) 204 { 205 Tuples.asModifiable( result ); 206 207 result = resultBuilder.makeResult( lastValues ); 208 209 if( LOG.isTraceEnabled() ) 210 LOG.trace( "tuple: {}", result.print() ); 211 212 return result; 213 } 214 215 public void remove() 216 { 217 // unsupported 218 } 219 } 220 221 static interface TupleBuilder 222 { 223 Tuple makeResult( Tuple[] tuples ); 224 } 225 }