001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.pipe.joiner;
023
024import java.beans.ConstructorProperties;
025import java.util.Arrays;
026import java.util.Iterator;
027
028import cascading.tuple.Fields;
029import cascading.tuple.Tuple;
030import cascading.tuple.Tuples;
031import cascading.tuple.util.TupleViews;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * Class InnerJoin will return an {@link Iterator} that will iterate over a given {@link Joiner} and return tuples that represent
037 * and inner join of the CoGrouper internal grouped tuple collections.
038 * <p>
039 * Joins perform based on the equality of the join keys. In the case of null values, Java treats two
040 * null values as equivalent. SQL does not treat null values as equal. To produce SQL like results in a given
041 * join, a new {@link java.util.Comparator} will need to be used on the joined values to prevent null from
042 * equaling null. As a convenience, see the {@link cascading.util.NullNotEquivalentComparator} class.
043 */
044public class InnerJoin extends BaseJoiner
045  {
046  /** Field LOG */
047  private static final Logger LOG = LoggerFactory.getLogger( InnerJoin.class );
048
049  public InnerJoin()
050    {
051    }
052
053  @ConstructorProperties({"fieldDeclaration"})
054  public InnerJoin( Fields fieldDeclaration )
055    {
056    super( fieldDeclaration );
057    }
058
059  public Iterator<Tuple> getIterator( JoinerClosure closure )
060    {
061    return new JoinIterator( closure );
062    }
063
064  public int numJoins()
065    {
066    return -1;
067    }
068
069  public static class JoinIterator implements Iterator<Tuple>
070    {
071    final JoinerClosure closure;
072    Iterator[] iterators;
073    Tuple[] lastValues;
074
075    TupleBuilder resultBuilder;
076    Tuple result = new Tuple(); // will be replaced
077
078    public JoinIterator( JoinerClosure closure )
079      {
080      this.closure = closure;
081
082      LOG.debug( "cogrouped size: {}", closure.size() );
083
084      init();
085      }
086
087    protected void init()
088      {
089      iterators = new Iterator[ closure.size() ];
090
091      for( int i = 0; i < closure.size(); i++ )
092        iterators[ i ] = getIterator( i );
093
094      boolean isUnknown = false;
095
096      for( Fields fields : closure.getValueFields() )
097        isUnknown |= fields.isUnknown();
098
099      if( isUnknown )
100        resultBuilder = new TupleBuilder()
101          {
102          Tuple result = new Tuple(); // is re-used
103
104          @Override
105          public Tuple makeResult( Tuple[] tuples )
106            {
107            result.clear();
108
109            // flatten the results into one Tuple
110            for( Tuple lastValue : tuples )
111              result.addAll( lastValue );
112
113            return result;
114            }
115          };
116      else
117        resultBuilder = new TupleBuilder()
118          {
119          Tuple result;
120
121          {
122          // handle self join.
123          Fields[] fields = closure.getValueFields();
124
125          if( closure.isSelfJoin() )
126            {
127            fields = new Fields[ closure.size() ];
128
129            Arrays.fill( fields, closure.getValueFields()[ 0 ] );
130            }
131
132          result = TupleViews.createComposite( fields );
133          }
134
135          @Override
136          public Tuple makeResult( Tuple[] tuples )
137            {
138            return TupleViews.reset( result, tuples );
139            }
140          };
141      }
142
143    protected Iterator getIterator( int i )
144      {
145      return closure.getIterator( i );
146      }
147
148    private Tuple[] initLastValues()
149      {
150      lastValues = new Tuple[ iterators.length ];
151
152      for( int i = 0; i < iterators.length; i++ )
153        lastValues[ i ] = (Tuple) iterators[ i ].next();
154
155      return lastValues;
156      }
157
158    public final boolean hasNext()
159      {
160      // if this is the first pass, and there is an iterator without a next value,
161      // then we have no next element
162      if( lastValues == null )
163        {
164        for( Iterator iterator : iterators )
165          {
166          if( !iterator.hasNext() )
167            return false;
168          }
169
170        return true;
171        }
172
173      for( Iterator iterator : iterators )
174        {
175        if( iterator.hasNext() )
176          return true;
177        }
178
179      return false;
180      }
181
182    public Tuple next()
183      {
184      if( lastValues == null )
185        return makeResult( initLastValues() );
186
187      for( int i = iterators.length - 1; i >= 0; i-- )
188        {
189        if( iterators[ i ].hasNext() )
190          {
191          lastValues[ i ] = (Tuple) iterators[ i ].next();
192          break;
193          }
194
195        // reset to first
196        iterators[ i ] = getIterator( i );
197        lastValues[ i ] = (Tuple) iterators[ i ].next();
198        }
199
200      return makeResult( lastValues );
201      }
202
203    private Tuple makeResult( Tuple[] lastValues )
204      {
205      Tuples.asModifiable( result );
206
207      result = resultBuilder.makeResult( lastValues );
208
209      if( LOG.isTraceEnabled() )
210        LOG.trace( "tuple: {}", result.print() );
211
212      return result;
213      }
214
215    public void remove()
216      {
217      // unsupported
218      }
219    }
220
221  static interface TupleBuilder
222    {
223    Tuple makeResult( Tuple[] tuples );
224    }
225  }