001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple; 023 024import java.io.Closeable; 025import java.io.IOException; 026import java.util.Collections; 027import java.util.Set; 028 029import cascading.flow.FlowProcess; 030import cascading.scheme.ConcreteCall; 031import cascading.scheme.Scheme; 032import cascading.util.CloseableIterator; 033import cascading.util.SingleCloseableInputIterator; 034import cascading.util.Util; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Class TupleEntrySchemeIterator is a helper class for wrapping a {@link Scheme} instance, calling 040 * {@link Scheme#source(cascading.flow.FlowProcess, cascading.scheme.SourceCall)} on every call to 041 * {@link #next()}. The behavior can be controlled via properties defined in {@link TupleEntrySchemeIteratorProps}. 042 * <p/> 043 * Use this class inside a custom {@link cascading.tap.Tap} when overriding the 044 * {@link cascading.tap.Tap#openForRead(cascading.flow.FlowProcess)} method. 045 */ 046public class TupleEntrySchemeIterator<Config, Input> extends TupleEntryIterator 047 { 048 /** Field LOG */ 049 private static final Logger LOG = LoggerFactory.getLogger( TupleEntrySchemeIterator.class ); 050 051 private final FlowProcess<? extends Config> flowProcess; 052 private final Scheme scheme; 053 private final CloseableIterator<Input> inputIterator; 054 private final Set<Class<? extends Exception>> permittedExceptions; 055 private ConcreteCall sourceCall; 056 057 private String identifier; 058 private boolean isComplete = false; 059 private boolean hasWaiting = false; 060 private TupleException currentException; 061 062 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input ) 063 { 064 this( flowProcess, scheme, input, null ); 065 } 066 067 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, Input input, String identifier ) 068 { 069 this( flowProcess, scheme, (CloseableIterator<Input>) new SingleCloseableInputIterator( (Closeable) input ), identifier ); 070 } 071 072 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator ) 073 { 074 this( flowProcess, scheme, inputIterator, null ); 075 } 076 077 public TupleEntrySchemeIterator( FlowProcess<? extends Config> flowProcess, Scheme scheme, CloseableIterator<Input> inputIterator, String identifier ) 078 { 079 super( scheme.getSourceFields() ); 080 this.flowProcess = flowProcess; 081 this.scheme = scheme; 082 this.inputIterator = inputIterator; 083 this.identifier = identifier; 084 085 Object permittedExceptions = flowProcess.getProperty( TupleEntrySchemeIteratorProps.PERMITTED_EXCEPTIONS ); 086 087 if( permittedExceptions != null ) 088 this.permittedExceptions = Util.asClasses( permittedExceptions.toString(), "unable to load permitted exception class" ); 089 else 090 this.permittedExceptions = Collections.emptySet(); 091 092 if( this.identifier == null || this.identifier.isEmpty() ) 093 this.identifier = "'unknown'"; 094 095 if( !inputIterator.hasNext() ) 096 { 097 isComplete = true; 098 return; 099 } 100 101 sourceCall = new ConcreteCall(); 102 103 sourceCall.setIncomingEntry( getTupleEntry() ); 104 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 105 106 try 107 { 108 this.scheme.sourcePrepare( flowProcess, sourceCall ); 109 } 110 catch( IOException exception ) 111 { 112 throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception ); 113 } 114 } 115 116 protected FlowProcess<? extends Config> getFlowProcess() 117 { 118 return flowProcess; 119 } 120 121 protected Input wrapInput( Input input ) 122 { 123 return input; 124 } 125 126 @Override 127 public boolean hasNext() 128 { 129 if( currentException != null ) 130 return true; 131 132 if( isComplete ) 133 return false; 134 135 if( hasWaiting ) 136 return true; 137 138 try 139 { 140 getNext(); 141 } 142 catch( Exception exception ) 143 { 144 if( identifier == null || identifier.isEmpty() ) 145 identifier = "'unknown'"; 146 147 if( permittedExceptions.contains( exception.getClass() ) ) 148 { 149 LOG.warn( "Caught permitted exception while reading {}", identifier, exception ); 150 return false; 151 } 152 153 currentException = new TupleException( "unable to read from input identifier: " + identifier, exception ); 154 155 return true; 156 } 157 158 if( !hasWaiting ) 159 isComplete = true; 160 161 return !isComplete; 162 } 163 164 private TupleEntry getNext() throws IOException 165 { 166 Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() ); 167 hasWaiting = scheme.source( flowProcess, sourceCall ); 168 169 while( !hasWaiting && inputIterator.hasNext() ) 170 { 171 sourceCall.setInput( wrapInput( inputIterator.next() ) ); 172 173 try 174 { 175 this.scheme.sourceRePrepare( flowProcess, sourceCall ); 176 } 177 catch( IOException exception ) 178 { 179 throw new TupleException( "unable to prepare source for input identifier: " + this.identifier, exception ); 180 } 181 182 Tuples.asModifiable( sourceCall.getIncomingEntry().getTuple() ); 183 hasWaiting = scheme.source( flowProcess, sourceCall ); 184 } 185 186 return getTupleEntry(); 187 } 188 189 @Override 190 public TupleEntry next() 191 { 192 try 193 { 194 if( currentException != null ) 195 throw currentException; 196 } 197 finally 198 { 199 currentException = null; // data may be trapped 200 } 201 202 if( isComplete ) 203 throw new IllegalStateException( "no next element" ); 204 205 try 206 { 207 if( hasWaiting ) 208 return getTupleEntry(); 209 210 return getNext(); 211 } 212 catch( Exception exception ) 213 { 214 throw new TupleException( "unable to source from input identifier: " + identifier, exception ); 215 } 216 finally 217 { 218 hasWaiting = false; 219 } 220 } 221 222 @Override 223 public void remove() 224 { 225 throw new UnsupportedOperationException( "may not remove elements from this iterator" ); 226 } 227 228 @Override 229 public void close() throws IOException 230 { 231 try 232 { 233 if( sourceCall != null ) 234 scheme.sourceCleanup( flowProcess, sourceCall ); 235 } 236 finally 237 { 238 inputIterator.close(); 239 } 240 } 241 }