001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.concurrent.Callable;
024    
025    import cascading.CascadingException;
026    import cascading.flow.FlowProcess;
027    import cascading.flow.SliceCounters;
028    import cascading.flow.StepCounters;
029    import cascading.tap.Tap;
030    import cascading.tuple.TupleEntry;
031    import cascading.tuple.TupleEntryIterator;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     *
037     */
038    public class SourceStage extends ElementStage<Void, TupleEntry> implements Callable<Throwable>
039      {
040      private static final Logger LOG = LoggerFactory.getLogger( SourceStage.class );
041    
042      private final Tap source;
043    
044      public SourceStage( FlowProcess flowProcess, Tap source )
045        {
046        super( flowProcess, source );
047        this.source = source;
048        }
049    
050      @Override
051      public Throwable call() throws Exception
052        {
053        return map( null );
054        }
055    
056      public void run( Object input ) throws Throwable
057        {
058        Throwable throwable = map( input );
059    
060        if( throwable != null )
061          throw throwable;
062        }
063    
064      private Throwable map( Object input )
065        {
066        Throwable localThrowable = null;
067        TupleEntryIterator iterator = null;
068    
069        try
070          {
071          next.start( this );
072    
073          // input may be null
074          iterator = source.openForRead( flowProcess, input );
075    
076          while( iterator.hasNext() )
077            {
078            TupleEntry tupleEntry;
079    
080            try
081              {
082              tupleEntry = iterator.next();
083              flowProcess.increment( StepCounters.Tuples_Read, 1 );
084              flowProcess.increment( SliceCounters.Tuples_Read, 1 );
085              }
086            catch( OutOfMemoryError error )
087              {
088              handleReThrowableException( "out of memory, try increasing task memory allocation", error );
089              continue;
090              }
091            catch( CascadingException exception )
092              {
093              handleException( exception, null );
094              continue;
095              }
096            catch( Throwable throwable )
097              {
098              handleException( new DuctException( "internal error", throwable ), null );
099              continue;
100              }
101    
102            next.receive( this, tupleEntry );
103            }
104    
105          next.complete( this );
106          }
107        catch( Throwable throwable )
108          {
109          if( !( throwable instanceof OutOfMemoryError ) )
110            LOG.error( "caught throwable", throwable );
111    
112          return throwable;
113          }
114        finally
115          {
116          try
117            {
118            if( iterator != null )
119              iterator.close();
120            }
121          catch( Throwable currentThrowable )
122            {
123            if( !( currentThrowable instanceof OutOfMemoryError ) )
124              LOG.warn( "failed closing iterator", currentThrowable );
125    
126            localThrowable = currentThrowable;
127            }
128          }
129    
130        return localThrowable;
131        }
132    
133      @Override
134      public void initialize()
135        {
136        }
137    
138      @Override
139      public void receive( Duct previous, Void nada )
140        {
141        throw new UnsupportedOperationException( "use call() instead" );
142        }
143      }