001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.util.Collections;
027import java.util.Set;
028import java.util.function.Supplier;
029
030import cascading.flow.FlowProcess;
031import cascading.scheme.ConcreteCall;
032import cascading.scheme.Scheme;
033import cascading.tap.Tap;
034import cascading.util.CloseableIterator;
035import cascading.util.SingleCloseableInputIterator;
036import cascading.util.Util;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling
042 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to
043 * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}.
044 * <p>
045 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
046 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method.
047 */
048public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator
049  {
050  /** Field LOG */
051  private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class );
052
053  private final FlowProcess<? extends Config> flowProcess;
054  private final Scheme<Config, Input, ?, Object, ?> scheme;
055  private final CloseableIterator<Input> inputIterator;
056  private final Set<Class<? extends Exception>> permittedExceptions;
057  private ConcreteCall sourceCall;
058
059  private Supplier<String> loggableIdentifier = () -> "'unknown'";
060  private boolean isComplete = false;
061  private boolean hasWaiting = false;
062  private TupleException currentException;
063
064  @Deprecated
065  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input )
066    {
067    this( flowProcess, scheme, input, null );
068    }
069
070  @Deprecated
071  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input, String loggableIdentifier )
072    {
073    this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), loggableIdentifier );
074    }
075
076  @Deprecated
077  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator )
078    {
079    this( flowProcess, scheme, inputIterator, null );
080    }
081
082  @Deprecated
083  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String loggableIdentifier )
084    {
085    this( flowProcess, null, scheme, inputIterator, loggableIdentifier );
086    }
087
088  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Input input )
089    {
090    this( flowProcess, tap, scheme, input, (Supplier<String>) null );
091    }
092
093  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Input input, String loggableIdentifier )
094    {
095    this( flowProcess, tap, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), loggableIdentifier );
096    }
097
098  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Input input, Supplier<String> loggableIdentifier )
099    {
100    this( flowProcess, tap, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), loggableIdentifier );
101    }
102
103  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, CloseableIterator<Input> inputIterator )
104    {
105    this( flowProcess, tap, scheme, inputIterator, (Supplier<String>) null );
106    }
107
108  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, CloseableIterator<Input> inputIterator, String loggableIdentifier )
109    {
110    this( flowProcess, tap, scheme, inputIterator, loggableIdentifier == null ? null : () -> loggableIdentifier );
111    }
112
113  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, CloseableIterator<Input> inputIterator, Supplier<String> loggableIdentifier )
114    {
115    super( scheme.getSourceFields() );
116    this.flowProcess = flowProcess;
117    this.scheme = scheme;
118    this.inputIterator = inputIterator;
119
120    Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS );
121
122    if( permittedExceptions != null )
123      this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" );
124    else
125      this.permittedExceptions = Collections.emptySet();
126
127    // honor provided loggableIdentifier value
128    if( tap != null && loggableIdentifier == null )
129      this.loggableIdentifier = tap::getIdentifier;
130    else if( loggableIdentifier != null )
131      this.loggableIdentifier = loggableIdentifier;
132
133    if( !inputIterator.hasNext() )
134      {
135      isComplete = true;
136      return;
137      }
138
139    sourceCall = createSourceCall();
140
141    sourceCall.setTap( tap );
142    sourceCall.setIncomingEntry( getTupleEntry() );
143    sourceCall.setInput( wrapInput( inputIterator.next() ) );
144
145    try
146      {
147      this.scheme.sourcePrepare( flowProcess, sourceCall );
148      }
149    catch( IOException exception )
150      {
151      throw new TupleException( "unable to prepare source for input identifier: " + this.loggableIdentifier.get(), exception );
152      }
153    }
154
155  /**
156   * Override to provide custom ConcreteCall implementation to expose Tap level resources to the underlying Scheme.
157   *
158   * @return a new ConcreteCall instance
159   */
160  protected <Context, IO> ConcreteCall<Context, IO> createSourceCall()
161    {
162    return new ConcreteCall<>();
163    }
164
165  protected FlowProcess<? extends Config> getFlowProcess()
166    {
167    return flowProcess;
168    }
169
170  protected Input wrapInput( Input input )
171    {
172    try
173      {
174      return scheme.sourceWrap( flowProcess, input );
175      }
176    catch( IOException exception )
177      {
178      throw new TupleException( "unable to wrap source for input identifier: " + this.loggableIdentifier.get(), exception );
179      }
180    }
181
182  @Override
183  public boolean hasNext()
184    {
185    if( currentException != null )
186      return true;
187
188    if( isComplete )
189      return false;
190
191    if( hasWaiting )
192      return true;
193
194    try
195      {
196      getNext();
197      }
198    catch( Exception exception )
199      {
200      if( permittedExceptions.contains( exception.getClass() ) )
201        {
202        LOG.warn( "Caught permitted exception while reading {}", loggableIdentifier.get(), exception );
203        return false;
204        }
205
206      currentException = new TupleException( "unable to read from input identifier: " + loggableIdentifier.get(), exception );
207
208      return true;
209      }
210
211    if( !hasWaiting )
212      isComplete = true;
213
214    return !isComplete;
215    }
216
217  private TupleEntry getNext() throws IOException
218    {
219    Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
220    hasWaiting = scheme.source( flowProcess, sourceCall );
221
222    while( !hasWaiting && inputIterator.hasNext() )
223      {
224      sourceCall.setInput( wrapInput( inputIterator.next() ) );
225
226      try
227        {
228        scheme.sourceRePrepare( flowProcess, sourceCall );
229        }
230      catch( IOException exception )
231        {
232        throw new TupleException( "unable to prepare source for input identifier: " + loggableIdentifier.get(), exception );
233        }
234
235      Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
236      hasWaiting = scheme.source( flowProcess, sourceCall );
237      }
238
239    return getTupleEntry();
240    }
241
242  @Override
243  public TupleEntry next()
244    {
245    try
246      {
247      if( currentException != null )
248        throw currentException;
249      }
250    finally
251      {
252      currentException = null; // data may be trapped
253      }
254
255    if( isComplete )
256      throw new IllegalStateException( "no next element" );
257
258    try
259      {
260      if( hasWaiting )
261        return getTupleEntry();
262
263      return getNext();
264      }
265    catch( Exception exception )
266      {
267      throw new TupleException( "unable to source from input identifier: " + loggableIdentifier.get(), exception );
268      }
269    finally
270      {
271      hasWaiting = false;
272      }
273    }
274
275  @Override
276  public void remove()
277    {
278    throw new UnsupportedOperationException( "may not remove elements from this iterator" );
279    }
280
281  @Override
282  public void close() throws IOException
283    {
284    try
285      {
286      if( sourceCall != null )
287        scheme.sourceCleanup( flowProcess, sourceCall );
288      }
289    finally
290      {
291      inputIterator.close();
292      }
293    }
294  }