001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.stream.element; 023 024import java.util.concurrent.Callable; 025 026import cascading.CascadingException; 027import cascading.flow.FlowProcess; 028import cascading.flow.SliceCounters; 029import cascading.flow.StepCounters; 030import cascading.flow.stream.StopDataNotificationException; 031import cascading.flow.stream.duct.Duct; 032import cascading.flow.stream.duct.DuctException; 033import cascading.tap.Tap; 034import cascading.tuple.TupleEntry; 035import cascading.tuple.TupleEntryIterator; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * 041 */ 042public class SourceStage extends ElementStage<Void, TupleEntry> implements Callable<Throwable>, InputSource 043 { 044 private static final Logger LOG = LoggerFactory.getLogger( SourceStage.class ); 045 046 private final Tap source; 047 048 public SourceStage( FlowProcess flowProcess, Tap source ) 049 { 050 super( flowProcess, source ); 051 this.source = source; 052 } 053 054 public Tap getSource() 055 { 056 return source; 057 } 058 059 @Override 060 public Throwable call() throws Exception 061 { 062 return map( null ); 063 } 064 065 @Override 066 public void run( Object input ) throws Throwable 067 { 068 Throwable throwable = map( input ); 069 070 if( throwable != null ) 071 throw throwable; 072 } 073 074 private Throwable map( Object input ) 075 { 076 Throwable localThrowable = null; 077 TupleEntryIterator iterator = null; 078 079 try 080 { 081 next.start( this ); 082 083 // input may be null 084 iterator = source.openForRead( flowProcess, input ); 085 086 while( iterator.hasNext() ) 087 { 088 if( Thread.interrupted() ) 089 throw new InterruptedException( "thread interrupted" ); 090 091 TupleEntry tupleEntry; 092 093 try 094 { 095 tupleEntry = timedNext( StepCounters.Read_Duration, iterator ); 096 flowProcess.increment( StepCounters.Tuples_Read, 1 ); 097 flowProcess.increment( SliceCounters.Tuples_Read, 1 ); 098 } 099 catch( OutOfMemoryError error ) 100 { 101 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 102 continue; 103 } 104 catch( CascadingException exception ) 105 { 106 handleException( exception, null ); 107 continue; 108 } 109 catch( Throwable throwable ) 110 { 111 handleException( new DuctException( "internal error", throwable ), null ); 112 continue; 113 } 114 115 try 116 { 117 next.receive( this, 0, tupleEntry ); 118 } 119 catch( StopDataNotificationException exception ) 120 { 121 LOG.info( "received stop data notification: {}", exception.getMessage() ); 122 break; 123 } 124 } 125 126 next.complete( this ); 127 } 128 catch( InterruptedException exception ) 129 { 130 // do nothing -- let finally run 131 } 132 catch( Throwable throwable ) 133 { 134 if( !( throwable instanceof OutOfMemoryError ) ) 135 LOG.error( "caught throwable", throwable ); 136 137 return throwable; 138 } 139 finally 140 { 141 try 142 { 143 if( iterator != null ) 144 iterator.close(); 145 } 146 catch( Throwable currentThrowable ) 147 { 148 if( !( currentThrowable instanceof OutOfMemoryError ) ) 149 LOG.warn( "failed closing iterator", currentThrowable ); 150 151 localThrowable = currentThrowable; 152 } 153 } 154 155 return localThrowable; 156 } 157 158 private TupleEntry timedNext( StepCounters durationCounter, TupleEntryIterator iterator ) 159 { 160 long start = System.currentTimeMillis(); 161 162 try 163 { 164 return iterator.next(); 165 } 166 finally 167 { 168 flowProcess.increment( durationCounter, System.currentTimeMillis() - start ); 169 } 170 } 171 172 @Override 173 public void initialize() 174 { 175 } 176 177 @Override 178 public void receive( Duct previous, int ordinal, Void nada ) 179 { 180 throw new UnsupportedOperationException( "use call() instead" ); 181 } 182 }