001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import java.util.concurrent.Callable;
025
026import cascading.CascadingException;
027import cascading.flow.FlowProcess;
028import cascading.flow.SliceCounters;
029import cascading.flow.StepCounters;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.duct.DuctException;
032import cascading.tap.Tap;
033import cascading.tuple.TupleEntry;
034import cascading.tuple.TupleEntryIterator;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 *
040 */
041public class SourceStage extends ElementStage<Void, TupleEntry> implements Callable<Throwable>, InputSource
042  {
043  private static final Logger LOG = LoggerFactory.getLogger( SourceStage.class );
044
045  private final Tap source;
046
047  public SourceStage( FlowProcess flowProcess, Tap source )
048    {
049    super( flowProcess, source );
050    this.source = source;
051    }
052
053  public Tap getSource()
054    {
055    return source;
056    }
057
058  @Override
059  public Throwable call() throws Exception
060    {
061    return map( null );
062    }
063
064  @Override
065  public void run( Object input ) throws Throwable
066    {
067    Throwable throwable = map( input );
068
069    if( throwable != null )
070      throw throwable;
071    }
072
073  private Throwable map( Object input )
074    {
075    Throwable localThrowable = null;
076    TupleEntryIterator iterator = null;
077
078    try
079      {
080      next.start( this );
081
082      // input may be null
083      iterator = source.openForRead( flowProcess, input );
084
085      while( iterator.hasNext() )
086        {
087        TupleEntry tupleEntry;
088
089        try
090          {
091          tupleEntry = iterator.next();
092          flowProcess.increment( StepCounters.Tuples_Read, 1 );
093          flowProcess.increment( SliceCounters.Tuples_Read, 1 );
094          }
095        catch( OutOfMemoryError error )
096          {
097          handleReThrowableException( "out of memory, try increasing task memory allocation", error );
098          continue;
099          }
100        catch( CascadingException exception )
101          {
102          handleException( exception, null );
103          continue;
104          }
105        catch( Throwable throwable )
106          {
107          handleException( new DuctException( "internal error", throwable ), null );
108          continue;
109          }
110
111        next.receive( this, 0, tupleEntry );
112        }
113
114      next.complete( this );
115      }
116    catch( Throwable throwable )
117      {
118      if( !( throwable instanceof OutOfMemoryError ) )
119        LOG.error( "caught throwable", throwable );
120
121      return throwable;
122      }
123    finally
124      {
125      try
126        {
127        if( iterator != null )
128          iterator.close();
129        }
130      catch( Throwable currentThrowable )
131        {
132        if( !( currentThrowable instanceof OutOfMemoryError ) )
133          LOG.warn( "failed closing iterator", currentThrowable );
134
135        localThrowable = currentThrowable;
136        }
137      }
138
139    return localThrowable;
140    }
141
142  @Override
143  public void initialize()
144    {
145    }
146
147  @Override
148  public void receive( Duct previous, int ordinal, Void nada )
149    {
150    throw new UnsupportedOperationException( "use call() instead" );
151    }
152  }