001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple;
022    
023    import java.io.Closeable;
024    import java.io.IOException;
025    import java.util.Collections;
026    import java.util.Set;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.scheme.ConcreteCall;
030    import cascading.scheme.Scheme;
031    import cascading.util.CloseableIterator;
032    import cascading.util.SingleCloseableInputIterator;
033    import cascading.util.Util;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling
039     * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to
040     * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}.
041     * <p/>
042     * Use this class inside a custom {@link cascading.tap.Tap} when overriding the
043     * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method.
044     */
045    public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator
046      {
047      /** Field LOG */
048      private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class );
049    
050      private final FlowProcess<Config> flowProcess;
051      private final Scheme scheme;
052      private final CloseableIterator<Input> inputIterator;
053      private final Set<Class<? extends Exception>> permittedExceptions;
054      private ConcreteCall sourceCall;
055    
056      private String identifier;
057      private boolean isComplete = false;
058      private boolean hasWaiting = false;
059      private TupleException currentException;
060    
061      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input )
062        {
063        this( flowProcess, scheme, input, null );
064        }
065    
066      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, Input input, String identifier )
067        {
068        this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier );
069        }
070    
071      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator )
072        {
073        this( flowProcess, scheme, inputIterator, null );
074        }
075    
076      public TupleEntrySchemeIterator( FlowProcess<Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier )
077        {
078        super( scheme.getSourceFields() );
079        this.flowProcess = flowProcess;
080        this.scheme = scheme;
081        this.inputIterator = inputIterator;
082        this.identifier = identifier;
083    
084        Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS );
085    
086        if( permittedExceptions != null )
087          this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" );
088        else
089          this.permittedExceptions = Collections.emptySet();
090    
091        if( this.identifier == null || this.identifier.isEmpty() )
092          this.identifier = "'unknown'";
093    
094        if( !inputIterator.hasNext() )
095          {
096          isComplete = true;
097          return;
098          }
099    
100        sourceCall = new ConcreteCall();
101    
102        sourceCall.setIncomingEntry( getTupleEntry() );
103        sourceCall.setInput( wrapInput( inputIterator.next() ) );
104    
105        try
106          {
107          this.scheme.sourcePrepare( flowProcess, sourceCall );
108          }
109        catch( IOException exception )
110          {
111          throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception );
112          }
113        }
114    
115      protected FlowProcess<Config> getFlowProcess()
116        {
117        return flowProcess;
118        }
119    
120      protected Input wrapInput( Input input )
121        {
122        return input;
123        }
124    
125      @Override
126      public boolean hasNext()
127        {
128        if( currentException != null )
129          return true;
130    
131        if( isComplete )
132          return false;
133    
134        if( hasWaiting )
135          return true;
136    
137        try
138          {
139          getNext();
140          }
141        catch( Exception exception )
142          {
143          if( identifier == null || identifier.isEmpty() )
144            identifier = "'unknown'";
145    
146          if( permittedExceptions.contains( exception.getClass() ) )
147            {
148            LOG.warn( "Caught permitted exception while reading {}", identifier, exception );
149            return false;
150            }
151    
152          currentException = new TupleException( "unable to read from input identifier: " + identifier, exception );
153    
154          return true;
155          }
156    
157        if( !hasWaiting )
158          isComplete = true;
159    
160        return !isComplete;
161        }
162    
163      private TupleEntry getNext() throws IOException
164        {
165        Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() );
166        hasWaiting = scheme.source( flowProcess, sourceCall );
167    
168        if( !hasWaiting && inputIterator.hasNext() )
169          {
170          sourceCall.setInput( wrapInput( inputIterator.next() ) );
171    
172          return getNext();
173          }
174    
175        return getTupleEntry();
176        }
177    
178      @Override
179      public TupleEntry next()
180        {
181        try
182          {
183          if( currentException != null )
184            throw currentException;
185          }
186        finally
187          {
188          currentException = null; // data may be trapped
189          }
190    
191        if( isComplete )
192          throw new IllegalStateException( "no next element" );
193    
194        try
195          {
196          if( hasWaiting )
197            return getTupleEntry();
198    
199          return getNext();
200          }
201        catch( Exception exception )
202          {
203          throw new TupleException( "unable to source from input identifier: " + identifier, exception );
204          }
205        finally
206          {
207          hasWaiting = false;
208          }
209        }
210    
211      @Override
212      public void remove()
213        {
214        throw new UnsupportedOperationException( "may not remove elements from this iterator" );
215        }
216    
217      @Override
218      public void close() throws IOException
219        {
220        try
221          {
222          if( sourceCall != null )
223            scheme.sourceCleanup( flowProcess, sourceCall );
224          }
225        finally
226          {
227          inputIterator.close();
228          }
229        }
230      }