001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tuple;
022
023import java.io.IOException;
024
025/**
026 * Interface TupleEntryCollector is used to allow {@link cascading.operation.BaseOperation} instances to emit
027 * one or more result {@link Tuple} values.
028 * <p/>
029 * The general rule in Cascading is if you are handed a Tuple, you cannot change or cache it. Attempts at modifying
030 * such a Tuple will result in an Exception. Preventing caching is harder, see below.
031 * <p/>
032 * If you create the Tuple, you can re-use or modify it.
033 * <p/>
034 * When calling {@link #add(Tuple)} or {@link #add(TupleEntry)}, you are passing a Tuple to the down stream pipes and
035 * operations. Since no downstream operation may modify or cache the Tuple instance, it is safe to re-use the Tuple
036 * instance when {@code add()} returns.
037 * <p/>
038 * That said, Tuple copies do get cached in order to perform specific operations in the underlying platforms. Currently
039 * only a shallow copy is made (via the {@link Tuple} copy constructor). Thus, any mutable type or collection
040 * placed inside a Tuple will not be copied, but will likely be cached if a copy of the Tuple passed downstream is
041 * copied.
042 * <p/>
043 * So any subsequent changes to that nested type or collection will be reflected in the cached copy, a likely
044 * source of hard to find errors.
045 * <p/>
046 * There is currently no way to specify that a deep copy must be performed when making a Tuple copy.
047 */
048public abstract class TupleEntryCollector
049  {
050  protected TupleEntry tupleEntry = new TupleEntry( Fields.UNKNOWN, null, true );
051
052  protected TupleEntryCollector()
053    {
054    }
055
056  /**
057   * Constructor TupleCollector creates a new TupleCollector instance.
058   *
059   * @param declared of type Fields
060   */
061  public TupleEntryCollector( Fields declared )
062    {
063    setFields( declared );
064    }
065
066  public void setFields( Fields declared )
067    {
068    if( declared == null )
069      throw new IllegalArgumentException( "declared fields must not be null" );
070
071    if( declared.isUnknown() || declared.isAll() )
072      return;
073
074    this.tupleEntry = new TupleEntry( declared, Tuple.size( declared.size() ), true );
075    }
076
077  /**
078   * Method add inserts the given {@link TupleEntry} into the outgoing stream. Note the method {@link #add(Tuple)} is
079   * more efficient as it simply calls {@link TupleEntry#getTuple()};
080   * <p/>
081   * See {@link cascading.tuple.TupleEntryCollector} on when and how to re-use a Tuple instance.
082   *
083   * @param tupleEntry of type TupleEntry
084   */
085  public void add( TupleEntry tupleEntry )
086    {
087    Fields expectedFields = this.tupleEntry.getFields();
088    TupleEntry outgoingEntry = this.tupleEntry;
089
090    if( expectedFields.isUnknown() || expectedFields.equals( tupleEntry.getFields() ) )
091      outgoingEntry = tupleEntry;
092    else
093      outgoingEntry.setTuple( selectTupleFrom( tupleEntry, expectedFields ) );
094
095    safeCollect( outgoingEntry );
096    }
097
098  private Tuple selectTupleFrom( TupleEntry tupleEntry, Fields expectedFields )
099    {
100    try
101      {
102      return tupleEntry.selectTuple( expectedFields );
103      }
104    catch( TupleException exception )
105      {
106      Fields givenFields = tupleEntry.getFields();
107      String string = "given TupleEntry fields: " + givenFields.printVerbose();
108      string += " do not match the operation declaredFields: " + expectedFields.printVerbose();
109      string += ", operations must emit tuples that match the fields they declare as output";
110
111      throw new TupleException( string, exception );
112      }
113    }
114
115  /**
116   * Method add inserts the given {@link Tuple} into the outgoing stream.
117   * <p/>
118   * See {@link cascading.tuple.TupleEntryCollector} on when and how to re-use a Tuple instance.
119   *
120   * @param tuple of type Tuple
121   */
122  public void add( Tuple tuple )
123    {
124    if( !tupleEntry.getFields().isUnknown() && tupleEntry.getFields().size() != tuple.size() )
125      throw new TupleException( "operation added the wrong number of fields, expected: " + tupleEntry.getFields().print() + ", got result size: " + tuple.size() );
126
127    boolean isUnmodifiable = tuple.isUnmodifiable();
128
129    tupleEntry.setTuple( tuple );
130
131    try
132      {
133      safeCollect( tupleEntry );
134      }
135    finally
136      {
137      Tuples.setUnmodifiable( tuple, isUnmodifiable );
138      }
139    }
140
141  private void safeCollect( TupleEntry tupleEntry )
142    {
143    try
144      {
145      collect( tupleEntry );
146      }
147    catch( IOException exception )
148      {
149      throw new TupleException( "unable to collect tuple", exception );
150      }
151    }
152
153  protected abstract void collect( TupleEntry tupleEntry ) throws IOException;
154
155  /**
156   * Method close closes the underlying resource being written to.
157   * <p/>
158   * This method should be called when when an instance is returned via
159   * {@link cascading.tap.Tap#openForWrite(cascading.flow.FlowProcess)}
160   * and no more {@link Tuple} instances will be written out.
161   * <p/>
162   * This method must not be called when an instance is returned from {@code getOutputCollector()} from any of
163   * the relevant {@link cascading.operation.OperationCall} implementations (inside a Function, Aggregator, or Buffer).
164   */
165  public void close()
166    {
167    // do nothing
168    }
169  }