001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.joiner;
022    
023    import java.util.Arrays;
024    import java.util.Iterator;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.tuple.Fields;
028    import cascading.tuple.Tuple;
029    
030    /**
031     * Class JoinerClosure wraps all incoming tuple streams with iterator instances allowing for just join implementations.
032     * <p/>
033     * This class is provided to a {@link Joiner#getIterator(JoinerClosure)} implementation, or to a {@link cascading.operation.Buffer}
034     * via the {@link cascading.operation.BufferCall#getJoinerClosure()} method.
035     * <p/>
036     * All iterators returned by {@link #getIterator(int)} may be retrieved more than once to restart them except for the left
037     * most iterator at position {@code 0} (zero).
038     * <p/>
039     * This iterator may only be iterated across once. All other iterators are backed by memory and possibly disk.
040     */
041    public abstract class JoinerClosure
042      {
043      protected final FlowProcess flowProcess;
044    
045      protected final Fields[] joinFields;
046      protected final Fields[] valueFields;
047    
048      public JoinerClosure( FlowProcess flowProcess, Fields[] joinFields, Fields[] valueFields )
049        {
050        this.flowProcess = flowProcess;
051        this.joinFields = Arrays.copyOf( joinFields, joinFields.length );
052        this.valueFields = Arrays.copyOf( valueFields, valueFields.length );
053        }
054    
055      public FlowProcess getFlowProcess()
056        {
057        return flowProcess;
058        }
059    
060      /**
061       * Returns an array of {@link Fields} denoting the join fields or keys uses for each incoming pipe.
062       * <p/>
063       * The most left handed pipe will be in array position 0.
064       *
065       * @return an array of Fields
066       */
067      public Fields[] getJoinFields()
068        {
069        return joinFields;
070        }
071    
072      /**
073       * Returns an array of all the incoming fields for each incoming pipe.
074       * <p/>
075       * The most left handed pipe will be in array position 0;
076       *
077       * @return an array of Fields
078       */
079      public Fields[] getValueFields()
080        {
081        return valueFields;
082        }
083    
084      public boolean isSelfJoin()
085        {
086        return valueFields.length == 1 && size() != valueFields.length;
087        }
088    
089      public abstract int size();
090    
091      /**
092       * Returns a Tuple Iterator for the given pipe position. Position 0 is the most left handed pipe passed to the prior
093       * {@link cascading.pipe.CoGroup}.
094       * <p/>
095       * To restart an Iterator over a given pipe, this method must be called again.
096       *
097       * @param pos of type int
098       * @return an Iterator of Tuple instances.
099       */
100      public abstract Iterator<Tuple> getIterator( int pos );
101    
102      public abstract boolean isEmpty( int pos );
103    
104      public abstract Tuple getGroupTuple( Tuple keysTuple );
105      }