001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tap.hadoop.io; 022 023 import java.io.IOException; 024 025 import cascading.flow.FlowProcess; 026 import cascading.flow.hadoop.util.HadoopUtil; 027 import cascading.tap.Tap; 028 import cascading.tap.TapException; 029 import cascading.tuple.Tuple; 030 import cascading.util.CloseableIterator; 031 import org.apache.hadoop.mapred.InputFormat; 032 import org.apache.hadoop.mapred.InputSplit; 033 import org.apache.hadoop.mapred.JobConf; 034 import org.apache.hadoop.mapred.JobConfigurable; 035 import org.apache.hadoop.mapred.RecordReader; 036 import org.apache.hadoop.mapred.Reporter; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * Class TapIterator is an implementation of {@link cascading.util.CloseableIterator}. It is returned by {@link cascading.tap.Tap} instances when 042 * opening the taps resource for reading. 043 */ 044 public class MultiRecordReaderIterator implements CloseableIterator<RecordReader> 045 { 046 /** Field LOG */ 047 private static final Logger LOG = LoggerFactory.getLogger( MultiRecordReaderIterator.class ); 048 049 private final FlowProcess<JobConf> flowProcess; 050 /** Field tap */ 051 private final Tap tap; 052 /** Field inputFormat */ 053 private InputFormat inputFormat; 054 /** Field conf */ 055 private JobConf conf; 056 /** Field splits */ 057 private InputSplit[] splits; 058 /** Field reader */ 059 private RecordReader reader; 060 061 /** Field lastReader */ 062 private RecordReader lastReader; 063 064 /** Field currentSplit */ 065 private int currentSplit = 0; 066 /** Field complete */ 067 private boolean complete = false; 068 069 /** 070 * Constructor TapIterator creates a new TapIterator instance. 071 * 072 * @throws IOException when 073 */ 074 public MultiRecordReaderIterator( FlowProcess<JobConf> flowProcess, Tap tap ) throws IOException 075 { 076 this.flowProcess = flowProcess; 077 this.tap = tap; 078 this.conf = flowProcess.getConfigCopy(); 079 080 initialize(); 081 } 082 083 private void initialize() throws IOException 084 { 085 // prevent collisions of configuration properties set client side if now cluster side 086 String property = flowProcess.getStringProperty( "cascading.step.accumulated.source.conf." + Tap.id( tap ) ); 087 088 if( property == null ) 089 { 090 // default behavior is to accumulate paths, so remove any set prior 091 conf = HadoopUtil.removePropertiesFrom( conf, "mapred.input.dir", "mapreduce.input.fileinputformat.inputdir" ); // hadoop2 092 tap.sourceConfInit( flowProcess, conf ); 093 } 094 095 inputFormat = conf.getInputFormat(); 096 097 if( inputFormat instanceof JobConfigurable ) 098 ( (JobConfigurable) inputFormat ).configure( conf ); 099 100 // do not test for existence, let hadoop decide how to handle the given path 101 // this delegates globbing to the inputformat on split generation. 102 splits = inputFormat.getSplits( conf, 1 ); 103 104 if( splits.length == 0 ) 105 complete = true; 106 } 107 108 private RecordReader makeReader( int currentSplit ) throws IOException 109 { 110 LOG.debug( "reading split: {}", currentSplit ); 111 112 return inputFormat.getRecordReader( splits[ currentSplit ], conf, Reporter.NULL ); 113 } 114 115 /** 116 * Method hasNext returns true if there more {@link Tuple} instances available. 117 * 118 * @return boolean 119 */ 120 public boolean hasNext() 121 { 122 getNextReader(); 123 124 return !complete; 125 } 126 127 /** 128 * Method next returns the next {@link Tuple}. 129 * 130 * @return Tuple 131 */ 132 public RecordReader next() 133 { 134 if( complete ) 135 throw new IllegalStateException( "no more values" ); 136 137 try 138 { 139 getNextReader(); 140 141 return reader; 142 } 143 finally 144 { 145 reader = null; 146 } 147 } 148 149 private void getNextReader() 150 { 151 if( complete || reader != null ) 152 return; 153 154 try 155 { 156 if( currentSplit < splits.length ) 157 { 158 if( lastReader != null ) 159 lastReader.close(); 160 161 reader = makeReader( currentSplit++ ); 162 lastReader = reader; 163 } 164 else 165 { 166 complete = true; 167 } 168 } 169 catch( IOException exception ) 170 { 171 throw new TapException( "could not get next tuple", exception ); 172 } 173 } 174 175 public void remove() 176 { 177 throw new UnsupportedOperationException( "unimplemented" ); 178 } 179 180 @Override 181 public void close() throws IOException 182 { 183 if( lastReader != null ) 184 lastReader.close(); 185 } 186 }