001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez.stream.element;
023
024import cascading.flow.FlowProcess;
025import cascading.flow.SliceCounters;
026import cascading.flow.hadoop.HadoopGroupByClosure;
027import cascading.flow.hadoop.util.TimedIterator;
028import cascading.flow.stream.StopDataNotificationException;
029import cascading.flow.stream.graph.IORole;
030import cascading.flow.tez.TezGroupByClosure;
031import cascading.flow.tez.util.SecondarySortKeyValuesReader;
032import cascading.pipe.GroupBy;
033import cascading.tuple.Tuple;
034import cascading.tuple.io.TuplePair;
035import cascading.util.LogUtil;
036import cascading.util.SortedListMultiMap;
037import cascading.util.Util;
038import org.apache.tez.runtime.api.LogicalInput;
039import org.apache.tez.runtime.api.LogicalOutput;
040import org.apache.tez.runtime.library.api.KeyValuesReader;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 *
046 */
047public class TezGroupByGate extends TezGroupGate
048  {
049  private static final Logger LOG = LoggerFactory.getLogger( TezGroupByGate.class );
050
051  protected TimedIterator[] timedIterators;
052
053  public TezGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role, LogicalOutput logicalOutput )
054    {
055    super( flowProcess, groupBy, role, logicalOutput );
056    }
057
058  public TezGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
059    {
060    super( flowProcess, groupBy, role, logicalInputs );
061
062    this.timedIterators = TimedIterator.iterators( new TimedIterator<>( flowProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read ) );
063    }
064
065  protected Throwable reduce() throws Exception
066    {
067    try
068      {
069      start( this );
070
071      // if multiple ordinals, an input could be duplicated if sourcing multiple paths
072      LogicalInput logicalInput = Util.getFirst( logicalInputs.getValues() );
073
074      KeyValuesReader reader = (KeyValuesReader) logicalInput.getReader();
075
076      if( sortFields != null )
077        reader = new SecondarySortKeyValuesReader( reader, groupComparators[ 0 ] );
078
079      while( reader.next() )
080        {
081        Tuple currentKey = (Tuple) reader.getCurrentKey(); // if secondary sorting, is a TuplePair
082        Iterable currentValues = reader.getCurrentValues();
083
084        timedIterators[ 0 ].reset( currentValues );
085
086        try
087          {
088          accept( currentKey, timedIterators ); // will unwrap the TuplePair
089          }
090        catch( StopDataNotificationException exception )
091          {
092          LogUtil.logWarnOnce( LOG, "received unsupported stop data notification, ignoring: {}", exception.getMessage() );
093          }
094        }
095
096      complete( this );
097      }
098    catch( Throwable throwable )
099      {
100      if( !( throwable instanceof OutOfMemoryError ) )
101        LOG.error( "caught throwable", throwable );
102
103      return throwable;
104      }
105
106    return null;
107    }
108
109  @Override
110  protected HadoopGroupByClosure createClosure()
111    {
112    return new TezGroupByClosure( flowProcess, keyFields, valuesFields );
113    }
114
115  @Override
116  protected Tuple unwrapGrouping( Tuple key )
117    {
118    // copying the lhs key during secondary sorting prevents the key from advancing at the end of the
119    // aggregation iterator
120    return sortFields == null ? key : new Tuple( ( (TuplePair) key ).getLhs() );
121    }
122  }