001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import java.util.concurrent.Callable;
025
026import cascading.CascadingException;
027import cascading.flow.FlowProcess;
028import cascading.flow.SliceCounters;
029import cascading.flow.StepCounters;
030import cascading.flow.stream.StopDataNotificationException;
031import cascading.flow.stream.duct.Duct;
032import cascading.flow.stream.duct.DuctException;
033import cascading.tap.Tap;
034import cascading.tuple.TupleEntry;
035import cascading.tuple.TupleEntryIterator;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 *
041 */
042public class SourceStage extends ElementStage<Void, TupleEntry> implements Callable<Throwable>, InputSource
043  {
044  private static final Logger LOG = LoggerFactory.getLogger( SourceStage.class );
045
046  private final Tap source;
047
048  public SourceStage( FlowProcess flowProcess, Tap source )
049    {
050    super( flowProcess, source );
051    this.source = source;
052    }
053
054  public Tap getSource()
055    {
056    return source;
057    }
058
059  @Override
060  public Throwable call() throws Exception
061    {
062    return map( null );
063    }
064
065  @Override
066  public void run( Object input ) throws Throwable
067    {
068    Throwable throwable = map( input );
069
070    if( throwable != null )
071      throw throwable;
072    }
073
074  private Throwable map( Object input )
075    {
076    Throwable localThrowable = null;
077    TupleEntryIterator iterator = null;
078
079    try
080      {
081      next.start( this );
082
083      // input may be null
084      iterator = source.openForRead( flowProcess, input );
085
086      while( iterator.hasNext() )
087        {
088        if( Thread.interrupted() )
089          throw new InterruptedException( "thread interrupted" );
090
091        TupleEntry tupleEntry;
092
093        try
094          {
095          tupleEntry = timedNext( StepCounters.Read_Duration, iterator );
096          flowProcess.increment( StepCounters.Tuples_Read, 1 );
097          flowProcess.increment( SliceCounters.Tuples_Read, 1 );
098          }
099        catch( OutOfMemoryError error )
100          {
101          handleReThrowableException( "out of memory, try increasing task memory allocation", error );
102          continue;
103          }
104        catch( CascadingException exception )
105          {
106          handleException( exception, null );
107          continue;
108          }
109        catch( Throwable throwable )
110          {
111          handleException( new DuctException( "internal error", throwable ), null );
112          continue;
113          }
114
115        try
116          {
117          next.receive( this, 0, tupleEntry );
118          }
119        catch( StopDataNotificationException exception )
120          {
121          LOG.info( "received stop data notification: {}", exception.getMessage() );
122          break;
123          }
124        }
125
126      next.complete( this );
127      }
128    catch( InterruptedException exception )
129      {
130      // do nothing -- let finally run
131      }
132    catch( Throwable throwable )
133      {
134      if( !( throwable instanceof OutOfMemoryError ) )
135        LOG.error( "caught throwable", throwable );
136
137      return throwable;
138      }
139    finally
140      {
141      try
142        {
143        if( iterator != null )
144          iterator.close();
145        }
146      catch( Throwable currentThrowable )
147        {
148        if( !( currentThrowable instanceof OutOfMemoryError ) )
149          LOG.warn( "failed closing iterator", currentThrowable );
150
151        localThrowable = currentThrowable;
152        }
153      }
154
155    return localThrowable;
156    }
157
158  private TupleEntry timedNext( StepCounters durationCounter, TupleEntryIterator iterator )
159    {
160    long start = System.currentTimeMillis();
161
162    try
163      {
164      return iterator.next();
165      }
166    finally
167      {
168      flowProcess.increment( durationCounter, System.currentTimeMillis() - start );
169      }
170    }
171
172  @Override
173  public void initialize()
174    {
175    }
176
177  @Override
178  public void receive( Duct previous, int ordinal, Void nada )
179    {
180    throw new UnsupportedOperationException( "use call() instead" );
181    }
182  }