001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple;
023
024import java.io.Closeable;
025import java.io.IOException;
026import java.util.Collections;
027import java.util.Set;
028
029import cascading.flow.FlowProcess;
030import cascading.scheme.ConcreteCall;
031import cascading.scheme.Scheme;
032import cascading.util.CloseableIterator;
033import cascading.util.SingleCloseableInputIterator;
034import cascading.util.Util;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling
040 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to
041 * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}.
042 * <p/>
043 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
044 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method.
045 */
046public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator
047  {
048  /** Field LOG */
049  private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class );
050
051  private final FlowProcess<? extends Config> flowProcess;
052  private final Scheme scheme;
053  private final CloseableIterator<Input> inputIterator;
054  private final Set<Class<? extends Exception>> permittedExceptions;
055  private ConcreteCall sourceCall;
056
057  private String identifier;
058  private boolean isComplete = false;
059  private boolean hasWaiting = false;
060  private TupleException currentException;
061
062  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input )
063    {
064    this( flowProcess, scheme, input, null );
065    }
066
067  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input, String identifier )
068    {
069    this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier );
070    }
071
072  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator )
073    {
074    this( flowProcess, scheme, inputIterator, null );
075    }
076
077  public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier )
078    {
079    super( scheme.getSourceFields() );
080    this.flowProcess = flowProcess;
081    this.scheme = scheme;
082    this.inputIterator = inputIterator;
083    this.identifier = identifier;
084
085    Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS );
086
087    if( permittedExceptions != null )
088      this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" );
089    else
090      this.permittedExceptions = Collections.emptySet();
091
092    if( this.identifier == null || this.identifier.isEmpty() )
093      this.identifier = "'unknown'";
094
095    if( !inputIterator.hasNext() )
096      {
097      isComplete = true;
098      return;
099      }
100
101    sourceCall = new ConcreteCall();
102
103    sourceCall.setIncomingEntry( getTupleEntry() );
104    sourceCall.setInput( wrapInput( inputIterator.next() ) );
105
106    try
107      {
108      this.scheme.sourcePrepare( flowProcess, sourceCall );
109      }
110    catch( IOException exception )
111      {
112      throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception );
113      }
114    }
115
116  protected FlowProcess<? extends Config> getFlowProcess()
117    {
118    return flowProcess;
119    }
120
121  protected Input wrapInput( Input input )
122    {
123    return input;
124    }
125
126  @Override
127  public boolean hasNext()
128    {
129    if( currentException != null )
130      return true;
131
132    if( isComplete )
133      return false;
134
135    if( hasWaiting )
136      return true;
137
138    try
139      {
140      getNext();
141      }
142    catch( Exception exception )
143      {
144      if( identifier == null || identifier.isEmpty() )
145        identifier = "'unknown'";
146
147      if( permittedExceptions.contains( exception.getClass() ) )
148        {
149        LOG.warn( "Caught permitted exception while reading {}", identifier, exception );
150        return false;
151        }
152
153      currentException = new TupleException( "unable to read from input identifier: " + identifier, exception );
154
155      return true;
156      }
157
158    if( !hasWaiting )
159      isComplete = true;
160
161    return !isComplete;
162    }
163
164  private TupleEntry getNext() throws IOException
165    {
166    Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
167    hasWaiting = scheme.source( flowProcess, sourceCall );
168
169    while( !hasWaiting && inputIterator.hasNext() )
170      {
171      sourceCall.setInput( wrapInput( inputIterator.next() ) );
172
173      try
174        {
175        this.scheme.sourceRePrepare( flowProcess, sourceCall );
176        }
177      catch( IOException exception )
178        {
179        throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception );
180        }
181
182      Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
183      hasWaiting = scheme.source( flowProcess, sourceCall );
184      }
185
186    return getTupleEntry();
187    }
188
189  @Override
190  public TupleEntry next()
191    {
192    try
193      {
194      if( currentException != null )
195        throw currentException;
196      }
197    finally
198      {
199      currentException = null; // data may be trapped
200      }
201
202    if( isComplete )
203      throw new IllegalStateException( "no next element" );
204
205    try
206      {
207      if( hasWaiting )
208        return getTupleEntry();
209
210      return getNext();
211      }
212    catch( Exception exception )
213      {
214      throw new TupleException( "unable to source from input identifier: " + identifier, exception );
215      }
216    finally
217      {
218      hasWaiting = false;
219      }
220    }
221
222  @Override
223  public void remove()
224    {
225    throw new UnsupportedOperationException( "may not remove elements from this iterator" );
226    }
227
228  @Override
229  public void close() throws IOException
230    {
231    try
232      {
233      if( sourceCall != null )
234        scheme.sourceCleanup( flowProcess, sourceCall );
235      }
236    finally
237      {
238      inputIterator.close();
239      }
240    }
241  }