001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.joiner;
022
023import java.util.Arrays;
024import java.util.Iterator;
025
026import cascading.flow.FlowProcess;
027import cascading.tuple.Fields;
028import cascading.tuple.Tuple;
029
030/**
031 * Class JoinerClosure wraps all incoming tuple streams with iterator instances allowing for just join implementations.
032 * <p/>
033 * This class is provided to a {@link Joiner#getIterator(JoinerClosure)} implementation, or to a {@link cascading.operation.Buffer}
034 * via the {@link cascading.operation.BufferCall#getJoinerClosure()} method.
035 * <p/>
036 * All iterators returned by {@link #getIterator(int)} may be retrieved more than once to restart them except for the left
037 * most iterator at position {@code 0} (zero).
038 * <p/>
039 * This iterator may only be iterated across once. All other iterators are backed by memory and possibly disk.
040 */
041public abstract class JoinerClosure
042  {
043  protected final FlowProcess flowProcess;
044
045  protected final Fields[] joinFields;
046  protected final Fields[] valueFields;
047
048  public JoinerClosure( FlowProcess flowProcess, Fields[] joinFields, Fields[] valueFields )
049    {
050    this.flowProcess = flowProcess;
051    this.joinFields = Arrays.copyOf( joinFields, joinFields.length );
052    this.valueFields = Arrays.copyOf( valueFields, valueFields.length );
053    }
054
055  public FlowProcess getFlowProcess()
056    {
057    return flowProcess;
058    }
059
060  /**
061   * Returns an array of {@link Fields} denoting the join fields or keys uses for each incoming pipe.
062   * <p/>
063   * The most left handed pipe will be in array position 0.
064   *
065   * @return an array of Fields
066   */
067  public Fields[] getJoinFields()
068    {
069    return joinFields;
070    }
071
072  /**
073   * Returns an array of all the incoming fields for each incoming pipe.
074   * <p/>
075   * The most left handed pipe will be in array position 0;
076   *
077   * @return an array of Fields
078   */
079  public Fields[] getValueFields()
080    {
081    return valueFields;
082    }
083
084  public boolean isSelfJoin()
085    {
086    return valueFields.length == 1 && size() != valueFields.length;
087    }
088
089  public abstract int size();
090
091  /**
092   * Returns a Tuple Iterator for the given pipe position. Position 0 is the most left handed pipe passed to the prior
093   * {@link cascading.pipe.CoGroup}.
094   * <p/>
095   * To restart an Iterator over a given pipe, this method must be called again.
096   *
097   * @param pos of type int
098   * @return an Iterator of Tuple instances.
099   */
100  public abstract Iterator<Tuple> getIterator( int pos );
101
102  public abstract boolean isEmpty( int pos );
103
104  public abstract Tuple getGroupTuple( Tuple keysTuple );
105  }