001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.util;
022
023import java.io.IOException;
024import java.util.Comparator;
025import java.util.Iterator;
026
027import cascading.CascadingException;
028import cascading.tuple.Tuple;
029import cascading.tuple.io.TuplePair;
030import org.apache.tez.runtime.library.api.KeyValuesReader;
031
032/**
033 *
034 */
035public class SecondarySortKeyValuesReader extends KeyValuesReader
036  {
037  private KeyValuesReader parent;
038  private Comparator<Tuple> groupComparator;
039  private Tuple currentKey;
040  private Iterable<Object> currentValues;
041  private boolean isNewKey = false;
042  private TuplePair currentKeyPair;
043
044  public SecondarySortKeyValuesReader( KeyValuesReader parent, Comparator<Tuple> groupComparator )
045    {
046    this.parent = parent;
047    this.groupComparator = groupComparator;
048    }
049
050  @Override
051  public boolean next() throws IOException
052    {
053    // next for forwarding to the next key
054    // so we must keep iterating across keys in the parent until we find a new
055    // grouping key without its accompanied sort values
056
057    if( parent != null && isNewKey )
058      {
059      isNewKey = false; // allow next next() to advance underlying iterator
060      return true;
061      }
062
063    boolean advanced = advance();
064
065    while( !isNewKey && advanced )
066      advanced = advance();
067
068    isNewKey = false;
069
070    return advanced;
071    }
072
073  protected boolean advance() throws IOException
074    {
075    if( parent == null )
076      return false;
077
078    boolean next = parent.next();
079
080    if( !next )
081      {
082      parent = null;
083      return false;
084      }
085
086    currentKeyPair = (TuplePair) parent.getCurrentKey();
087
088    isNewKey = currentKey == null || groupComparator.compare( currentKey, currentKeyPair.getLhs() ) != 0;
089    currentKey = currentKeyPair.getLhs();
090    currentValues = parent.getCurrentValues();
091
092    return true;
093    }
094
095  @Override
096  public Object getCurrentKey() throws IOException
097    {
098    return currentKeyPair;
099    }
100
101  @Override
102  public Iterable<Object> getCurrentValues() throws IOException
103    {
104    return new Iterable<Object>()
105      {
106      @Override
107      public Iterator<Object> iterator()
108        {
109        final Iterator<Object>[] iterator = new Iterator[]{currentValues.iterator()};
110
111        return new Iterator<Object>()
112          {
113          @Override
114          public boolean hasNext()
115            {
116            boolean hasNext = iterator[ 0 ].hasNext();
117
118            if( hasNext )
119              return true;
120
121            if( !advanceSafe() )
122              return false;
123
124            if( isNewKey )
125              return false;
126
127            iterator[ 0 ] = currentValues.iterator();
128
129            return hasNext();
130            }
131
132          @Override
133          public Object next()
134            {
135            return iterator[ 0 ].next();
136            }
137
138          @Override
139          public void remove()
140            {
141            iterator[ 0 ].remove();
142            }
143
144          protected boolean advanceSafe()
145            {
146            try
147              {
148              return advance();
149              }
150            catch( IOException exception )
151              {
152              throw new CascadingException( "unable to advance values iterator", exception );
153              }
154            }
155          };
156        }
157      };
158    }
159  }