001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.tap.hadoop.io;
022
023import java.util.Iterator;
024
025import cascading.flow.FlowProcess;
026import cascading.tap.TapException;
027import cascading.tap.partition.Partition;
028import cascading.tuple.Fields;
029import cascading.tuple.Tuple;
030import cascading.tuple.TupleEntry;
031import cascading.tuple.TupleEntrySchemeIterator;
032import cascading.tuple.util.TupleViews;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035
036public class CombineInputPartitionTupleEntryIterator implements Iterator<Tuple>
037  {
038  private final TupleEntrySchemeIterator childIterator;
039  private final FlowProcess<? extends Configuration> flowProcess;
040  private final TupleEntry partitionEntry;
041  private final Fields sourceFields;
042  private final Partition partition;
043  private final String parentIdentifier;
044
045  private Tuple base;
046  private Tuple view;
047  private String currentFile;
048
049  public CombineInputPartitionTupleEntryIterator( FlowProcess<? extends Configuration> flowProcess, Fields sourceFields,
050                                                  Partition partition, String parentIdentifier, TupleEntrySchemeIterator childIterator )
051    {
052    this.flowProcess = flowProcess;
053    this.partition = partition;
054    this.parentIdentifier = parentIdentifier;
055    this.childIterator = childIterator;
056    this.sourceFields = sourceFields;
057    this.partitionEntry = new TupleEntry( partition.getPartitionFields(), Tuple.size( partition.getPartitionFields().size() ) );
058    }
059
060  @Override
061  public boolean hasNext()
062    {
063    return childIterator.hasNext();
064    }
065
066  @Override
067  public Tuple next()
068    {
069    String currentFile = getCurrentFile();
070    if( this.currentFile == null || !this.currentFile.equals( currentFile ) )
071      {
072      this.currentFile = currentFile;
073
074      try
075        {
076        String childIdentifier = new Path( currentFile ).getParent().toString(); // drop part-xxxx
077        partition.toTuple( childIdentifier.substring( parentIdentifier.length() + 1 ), partitionEntry );
078        }
079      catch( Exception exception )
080        {
081        throw new TapException( "unable to parse partition given parent: " + parentIdentifier + " and child: " + currentFile, exception );
082        }
083
084      base = TupleViews.createOverride( sourceFields, partitionEntry.getFields() );
085
086      TupleViews.reset( base, Tuple.size( sourceFields.size() ), partitionEntry.getTuple() );
087
088      view = TupleViews.createOverride( sourceFields, childIterator.getFields() );
089      }
090
091    Tuple tuple = childIterator.next().getTuple();
092    TupleViews.reset( view, base, tuple );
093
094    return view;
095    }
096
097  private String getCurrentFile()
098    {
099    String result = flowProcess.getStringProperty( "mapreduce.map.input.file" );
100
101    if( result == null )
102      result = flowProcess.getStringProperty( "map.input.file" );
103
104    return result;
105    }
106
107  @Override
108  public void remove()
109    {
110    childIterator.remove();
111    }
112  }