001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tap.hadoop.io; 022 023import java.util.Iterator; 024 025import cascading.flow.FlowProcess; 026import cascading.tap.TapException; 027import cascading.tap.partition.Partition; 028import cascading.tuple.Fields; 029import cascading.tuple.Tuple; 030import cascading.tuple.TupleEntry; 031import cascading.tuple.TupleEntrySchemeIterator; 032import cascading.tuple.util.TupleViews; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035 036public class CombineInputPartitionTupleEntryIterator implements Iterator<Tuple> 037 { 038 private final TupleEntrySchemeIterator childIterator; 039 private final FlowProcess<? extends Configuration> flowProcess; 040 private final TupleEntry partitionEntry; 041 private final Fields sourceFields; 042 private final Partition partition; 043 private final String parentIdentifier; 044 045 private Tuple base; 046 private Tuple view; 047 private String currentFile; 048 049 public CombineInputPartitionTupleEntryIterator( FlowProcess<? extends Configuration> flowProcess, Fields sourceFields, 050 Partition partition, String parentIdentifier, TupleEntrySchemeIterator childIterator ) 051 { 052 this.flowProcess = flowProcess; 053 this.partition = partition; 054 this.parentIdentifier = parentIdentifier; 055 this.childIterator = childIterator; 056 this.sourceFields = sourceFields; 057 this.partitionEntry = new TupleEntry( partition.getPartitionFields(), Tuple.size( partition.getPartitionFields().size() ) ); 058 } 059 060 @Override 061 public boolean hasNext() 062 { 063 return childIterator.hasNext(); 064 } 065 066 @Override 067 public Tuple next() 068 { 069 String currentFile = getCurrentFile(); 070 if( this.currentFile == null || !this.currentFile.equals( currentFile ) ) 071 { 072 this.currentFile = currentFile; 073 074 try 075 { 076 String childIdentifier = new Path( currentFile ).getParent().toString(); // drop part-xxxx 077 partition.toTuple( childIdentifier.substring( parentIdentifier.length() + 1 ), partitionEntry ); 078 } 079 catch( Exception exception ) 080 { 081 throw new TapException( "unable to parse partition given parent: " + parentIdentifier + " and child: " + currentFile, exception ); 082 } 083 084 base = TupleViews.createOverride( sourceFields, partitionEntry.getFields() ); 085 086 TupleViews.reset( base, Tuple.size( sourceFields.size() ), partitionEntry.getTuple() ); 087 088 view = TupleViews.createOverride( sourceFields, childIterator.getFields() ); 089 } 090 091 Tuple tuple = childIterator.next().getTuple(); 092 TupleViews.reset( view, base, tuple ); 093 094 return view; 095 } 096 097 private String getCurrentFile() 098 { 099 String result = flowProcess.getStringProperty( "mapreduce.map.input.file" ); 100 101 if( result == null ) 102 result = flowProcess.getStringProperty( "map.input.file" ); 103 104 return result; 105 } 106 107 @Override 108 public void remove() 109 { 110 childIterator.remove(); 111 } 112 }