001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.joiner;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Arrays;
025    import java.util.Iterator;
026    
027    import cascading.tuple.Fields;
028    import cascading.tuple.Tuple;
029    import cascading.tuple.Tuples;
030    import cascading.tuple.util.TupleViews;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * Class InnerJoin will return an {@link Iterator} that will iterate over a given {@link Joiner} and return tuples that represent
036     * and inner join of the CoGrouper internal grouped tuple collections.
037     * <p/>
038     * Joins perform based on the equality of the join keys. In the case of null values, Java treats two
039     * null values as equivalent. SQL does not treat null values as equal. To produce SQL like results in a given
040     * join, a new {@link java.util.Comparator} will need to be used on the joined values to prevent null from
041     * equaling null. As a convenience, see the {@link cascading.util.NullNotEquivalentComparator} class.
042     */
043    public class InnerJoin extends BaseJoiner
044      {
045      /** Field LOG */
046      private static final Logger LOG = LoggerFactory.getLogger( InnerJoin.class );
047    
048      public InnerJoin()
049        {
050        }
051    
052      @ConstructorProperties({"fieldDeclaration"})
053      public InnerJoin( Fields fieldDeclaration )
054        {
055        super( fieldDeclaration );
056        }
057    
058      public Iterator<Tuple> getIterator( JoinerClosure closure )
059        {
060        return new JoinIterator( closure );
061        }
062    
063      public int numJoins()
064        {
065        return -1;
066        }
067    
068      public static class JoinIterator implements Iterator<Tuple>
069        {
070        final JoinerClosure closure;
071        Iterator[] iterators;
072        Tuple[] lastValues;
073    
074        TupleBuilder resultBuilder;
075        Tuple result = new Tuple(); // will be replaced
076    
077        public JoinIterator( JoinerClosure closure )
078          {
079          this.closure = closure;
080    
081          LOG.debug( "cogrouped size: {}", closure.size() );
082    
083          init();
084          }
085    
086        protected void init()
087          {
088          iterators = new Iterator[ closure.size() ];
089    
090          for( int i = 0; i < closure.size(); i++ )
091            iterators[ i ] = getIterator( i );
092    
093          boolean isUnknown = false;
094    
095          for( Fields fields : closure.getValueFields() )
096            isUnknown |= fields.isUnknown();
097    
098          if( isUnknown )
099            resultBuilder = new TupleBuilder()
100            {
101            Tuple result = new Tuple(); // is re-used
102    
103            @Override
104            public Tuple makeResult( Tuple[] tuples )
105              {
106              result.clear();
107    
108              // flatten the results into one Tuple
109              for( Tuple lastValue : tuples )
110                result.addAll( lastValue );
111    
112              return result;
113              }
114            };
115          else
116            resultBuilder = new TupleBuilder()
117            {
118            Tuple result;
119    
120            {
121            // handle self join.
122            Fields[] fields = closure.getValueFields();
123    
124            if( closure.isSelfJoin() )
125              {
126              fields = new Fields[ closure.size() ];
127    
128              Arrays.fill( fields, closure.getValueFields()[ 0 ] );
129              }
130    
131            result = TupleViews.createComposite( fields );
132            }
133    
134            @Override
135            public Tuple makeResult( Tuple[] tuples )
136              {
137              return TupleViews.reset( result, tuples );
138              }
139            };
140          }
141    
142        protected Iterator getIterator( int i )
143          {
144          return closure.getIterator( i );
145          }
146    
147        private Tuple[] initLastValues()
148          {
149          lastValues = new Tuple[ iterators.length ];
150    
151          for( int i = 0; i < iterators.length; i++ )
152            lastValues[ i ] = (Tuple) iterators[ i ].next();
153    
154          return lastValues;
155          }
156    
157        public final boolean hasNext()
158          {
159          // if this is the first pass, and there is an iterator without a next value,
160          // then we have no next element
161          if( lastValues == null )
162            {
163            for( Iterator iterator : iterators )
164              {
165              if( !iterator.hasNext() )
166                return false;
167              }
168    
169            return true;
170            }
171    
172          for( Iterator iterator : iterators )
173            {
174            if( iterator.hasNext() )
175              return true;
176            }
177    
178          return false;
179          }
180    
181        public Tuple next()
182          {
183          if( lastValues == null )
184            return makeResult( initLastValues() );
185    
186          for( int i = iterators.length - 1; i >= 0; i-- )
187            {
188            if( iterators[ i ].hasNext() )
189              {
190              lastValues[ i ] = (Tuple) iterators[ i ].next();
191              break;
192              }
193    
194            // reset to first
195            iterators[ i ] = getIterator( i );
196            lastValues[ i ] = (Tuple) iterators[ i ].next();
197            }
198    
199          return makeResult( lastValues );
200          }
201    
202        private Tuple makeResult( Tuple[] lastValues )
203          {
204          Tuples.asModifiable( result );
205    
206          result = resultBuilder.makeResult( lastValues );
207    
208          if( LOG.isTraceEnabled() )
209            LOG.trace( "tuple: {}", result.print() );
210    
211          return result;
212          }
213    
214        public void remove()
215          {
216          // unsupported
217          }
218        }
219    
220      static interface TupleBuilder
221        {
222        Tuple makeResult( Tuple[] tuples );
223        }
224      }