001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.util.Collections; 027import java.util.Set; 028import java.util.function.Supplier; 029 030import cascading.flow.FlowProcess; 031import cascading.scheme.ConcreteCall; 032import cascading.scheme.Scheme; 033import cascading.tap.Tap; 034import cascading.util.CloseableIterator; 035import cascading.util.SingleCloseableInputIterator; 036import cascading.util.Util; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling 042 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to 043 * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}. 044 * <p> 045 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the 046 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method. 047 */ 048public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator 049 { 050 /** Field LOG */ 051 private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class ); 052 053 private final FlowProcess<? extends Config> flowProcess; 054 private final Scheme<Config, Input, ?, Object, ?> scheme; 055 private final CloseableIterator<Input> inputIterator; 056 private final Set<Class<? extends Exception>> permittedExceptions; 057 private ConcreteCall sourceCall; 058 059 private Supplier<String> loggableIdentifier = () -> "'unknown'"; 060 private boolean isComplete = false; 061 private boolean hasWaiting = false; 062 private TupleException currentException; 063 064 @Deprecated 065 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input ) 066 { 067 this( flowProcess, scheme, input, null ); 068 } 069 070 @Deprecated 071 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input, String loggableIdentifier ) 072 { 073 this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), loggableIdentifier ); 074 } 075 076 @Deprecated 077 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator ) 078 { 079 this( flowProcess, scheme, inputIterator, null ); 080 } 081 082 @Deprecated 083 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String loggableIdentifier ) 084 { 085 this( flowProcess, null, scheme, inputIterator, loggableIdentifier ); 086 } 087 088 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Input input ) 089 { 090 this( flowProcess, tap, scheme, input, (Supplier<String>) null ); 091 } 092 093 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Input input, String loggableIdentifier ) 094 { 095 this( flowProcess, tap, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), loggableIdentifier ); 096 } 097 098 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, Input input, Supplier<String> loggableIdentifier ) 099 { 100 this( flowProcess, tap, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), loggableIdentifier ); 101 } 102 103 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, CloseableIterator<Input> inputIterator ) 104 { 105 this( flowProcess, tap, scheme, inputIterator, (Supplier<String>) null ); 106 } 107 108 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, CloseableIterator<Input> inputIterator, String loggableIdentifier ) 109 { 110 this( flowProcess, tap, scheme, inputIterator, loggableIdentifier == null ? null : () -> loggableIdentifier ); 111 } 112 113 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Tap tap, Scheme scheme, CloseableIterator<Input> inputIterator, Supplier<String> loggableIdentifier ) 114 { 115 super( scheme.getSourceFields() ); 116 this.flowProcess = flowProcess; 117 this.scheme = scheme; 118 this.inputIterator = inputIterator; 119 120 Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS ); 121 122 if( permittedExceptions != null ) 123 this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" ); 124 else 125 this.permittedExceptions = Collections.emptySet(); 126 127 // honor provided loggableIdentifier value 128 if( tap != null && loggableIdentifier == null ) 129 this.loggableIdentifier = tap::getIdentifier; 130 else if( loggableIdentifier != null ) 131 this.loggableIdentifier = loggableIdentifier; 132 133 if( !inputIterator.hasNext() ) 134 { 135 isComplete = true; 136 return; 137 } 138 139 sourceCall = createSourceCall(); 140 141 sourceCall.setTap( tap ); 142 sourceCall.setIncomingEntry( getTupleEntry() ); 143 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 144 145 try 146 { 147 this.scheme.sourcePrepare( flowProcess, sourceCall ); 148 } 149 catch( IOException exception ) 150 { 151 throw new TupleException( "unable to prepare source for input identifier: " + this.loggableIdentifier.get(), exception ); 152 } 153 } 154 155 /** 156 * Override to provide custom ConcreteCall implementation to expose Tap level resources to the underlying Scheme. 157 * 158 * @return a new ConcreteCall instance 159 */ 160 protected <Context, IO> ConcreteCall<Context, IO> createSourceCall() 161 { 162 return new ConcreteCall<>(); 163 } 164 165 protected FlowProcess<? extends Config> getFlowProcess() 166 { 167 return flowProcess; 168 } 169 170 protected Input wrapInput( Input input ) 171 { 172 try 173 { 174 return scheme.sourceWrap( flowProcess, input ); 175 } 176 catch( IOException exception ) 177 { 178 throw new TupleException( "unable to wrap source for input identifier: " + this.loggableIdentifier.get(), exception ); 179 } 180 } 181 182 @Override 183 public boolean hasNext() 184 { 185 if( currentException != null ) 186 return true; 187 188 if( isComplete ) 189 return false; 190 191 if( hasWaiting ) 192 return true; 193 194 try 195 { 196 getNext(); 197 } 198 catch( Exception exception ) 199 { 200 if( permittedExceptions.contains( exception.getClass() ) ) 201 { 202 LOG.warn( "Caught permitted exception while reading {}", loggableIdentifier.get(), exception ); 203 return false; 204 } 205 206 currentException = new TupleException( "unable to read from input identifier: " + loggableIdentifier.get(), exception ); 207 208 return true; 209 } 210 211 if( !hasWaiting ) 212 isComplete = true; 213 214 return !isComplete; 215 } 216 217 private TupleEntry getNext() throws IOException 218 { 219 Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() ); 220 hasWaiting = scheme.source( flowProcess, sourceCall ); 221 222 while( !hasWaiting && inputIterator.hasNext() ) 223 { 224 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 225 226 try 227 { 228 scheme.sourceRePrepare( flowProcess, sourceCall ); 229 } 230 catch( IOException exception ) 231 { 232 throw new TupleException( "unable to prepare source for input identifier: " + loggableIdentifier.get(), exception ); 233 } 234 235 Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() ); 236 hasWaiting = scheme.source( flowProcess, sourceCall ); 237 } 238 239 return getTupleEntry(); 240 } 241 242 @Override 243 public TupleEntry next() 244 { 245 try 246 { 247 if( currentException != null ) 248 throw currentException; 249 } 250 finally 251 { 252 currentException = null; // data may be trapped 253 } 254 255 if( isComplete ) 256 throw new IllegalStateException( "no next element" ); 257 258 try 259 { 260 if( hasWaiting ) 261 return getTupleEntry(); 262 263 return getNext(); 264 } 265 catch( Exception exception ) 266 { 267 throw new TupleException( "unable to source from input identifier: " + loggableIdentifier.get(), exception ); 268 } 269 finally 270 { 271 hasWaiting = false; 272 } 273 } 274 275 @Override 276 public void remove() 277 { 278 throw new UnsupportedOperationException( "may not remove elements from this iterator" ); 279 } 280 281 @Override 282 public void close() throws IOException 283 { 284 try 285 { 286 if( sourceCall != null ) 287 scheme.sourceCleanup( flowProcess, sourceCall ); 288 } 289 finally 290 { 291 inputIterator.close(); 292 } 293 } 294 }