001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez.stream.element;
023
024import java.util.Collections;
025import java.util.List;
026import java.util.Map;
027
028import cascading.CascadingException;
029import cascading.flow.FlowProcess;
030import cascading.flow.SliceCounters;
031import cascading.flow.hadoop.HadoopCoGroupClosure;
032import cascading.flow.hadoop.util.TimedIterator;
033import cascading.flow.stream.StopDataNotificationException;
034import cascading.flow.stream.duct.DuctException;
035import cascading.flow.stream.graph.IORole;
036import cascading.flow.tez.TezCoGroupClosure;
037import cascading.pipe.CoGroup;
038import cascading.tuple.Tuple;
039import cascading.tuple.io.TuplePair;
040import cascading.util.LogUtil;
041import cascading.util.SortedListMultiMap;
042import org.apache.tez.runtime.api.LogicalInput;
043import org.apache.tez.runtime.api.LogicalOutput;
044import org.apache.tez.runtime.library.api.KeyValuesReader;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 *
050 */
051public class TezCoGroupGate extends TezGroupGate
052  {
053  private static final Logger LOG = LoggerFactory.getLogger( TezCoGroupGate.class );
054
055  protected TimedIterator<Tuple>[] timedIterators;
056
057  public TezCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role, LogicalOutput logicalOutput )
058    {
059    super( flowProcess, coGroup, role, logicalOutput );
060    }
061
062  public TezCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
063    {
064    super( flowProcess, coGroup, role, logicalInputs );
065
066    this.timedIterators = new TimedIterator[ logicalInputs.getKeys().size() ];
067
068    for( int i = 0; i < this.timedIterators.length; i++ )
069      this.timedIterators[ i ] = new TimedIterator<>( flowProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read, i );
070    }
071
072  @Override
073  protected Throwable reduce() throws Exception
074    {
075    try
076      {
077      start( this );
078
079      SortedListMultiMap<Integer, KeyValuesReader> readers = getKeyValuesReaders();
080      SortedListMultiMap<Tuple, Iterable<Tuple>> iterables = getSortedMultiMap( readers.getKeys().size() );
081
082      Map.Entry<Tuple, List<Iterable<Tuple>>> current = forwardToNext( readers, iterables, null );
083      List<Iterable<Tuple>> currentValues;
084
085      while( current != null )
086        {
087        currentValues = current.getValue();
088
089        for( int i = 0; i < timedIterators.length; i++ )
090          timedIterators[ i ].reset( currentValues.get( i ) );
091
092        try
093          {
094          accept( current.getKey(), timedIterators );
095          }
096        catch( StopDataNotificationException exception )
097          {
098          LogUtil.logWarnOnce( LOG, "received unsupported stop data notification, ignoring: {}", exception.getMessage() );
099          }
100
101        current = forwardToNext( readers, iterables, currentValues );
102        }
103
104      complete( this );
105      }
106    catch( Throwable throwable )
107      {
108      if( !( throwable instanceof OutOfMemoryError ) )
109        LOG.error( "caught throwable", throwable );
110
111      return throwable;
112      }
113
114    return null;
115    }
116
117  private SortedListMultiMap<Integer, KeyValuesReader> getKeyValuesReaders() throws Exception
118    {
119    SortedListMultiMap<Integer, KeyValuesReader> readers = new SortedListMultiMap<>();
120
121    for( Map.Entry<Integer, List<LogicalInput>> entry : logicalInputs.getEntries() )
122      {
123      for( LogicalInput logicalInput : entry.getValue() )
124        readers.put( entry.getKey(), (KeyValuesReader) logicalInput.getReader() );
125      }
126
127    return readers;
128    }
129
130  private Map.Entry<Tuple, List<Iterable<Tuple>>> forwardToNext( SortedListMultiMap<Integer, KeyValuesReader> readers, SortedListMultiMap<Tuple, Iterable<Tuple>> iterables, List<Iterable<Tuple>> current )
131    {
132    try
133      {
134      int size = current == null ? readers.getKeys().size() : current.size();
135
136      for( int ordinal = 0; ordinal < size; ordinal++ )
137        {
138        if( current != null && current.get( ordinal ) == null )
139          continue;
140
141        for( KeyValuesReader reader : readers.getValues( ordinal ) )
142          {
143          if( !reader.next() )
144            continue;
145
146          Tuple currentKey = (Tuple) reader.getCurrentKey();
147
148          if( splice.isSorted() )
149            currentKey = ( (TuplePair) currentKey ).getLhs();
150
151          currentKey = getDelegatedTuple( currentKey ); // applies hasher
152
153          Iterable<Tuple> currentValues = (Iterable) reader.getCurrentValues();
154
155          iterables.set( currentKey, ordinal, currentValues );
156          }
157        }
158      }
159    catch( OutOfMemoryError error )
160      {
161      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
162      }
163    catch( CascadingException exception )
164      {
165      handleException( exception, null );
166      }
167    catch( Throwable throwable )
168      {
169      handleException( new DuctException( "internal error", throwable ), null );
170      }
171
172    return iterables.pollFirstEntry();
173    }
174
175  private SortedListMultiMap<Tuple, Iterable<Tuple>> getSortedMultiMap( final int length )
176    {
177    return new SortedListMultiMap<Tuple, Iterable<Tuple>>( getKeyComparator(), length )
178      {
179      Iterable<Tuple>[] array = new Iterable[ length ];
180
181      @Override
182      protected List createCollection()
183        {
184        List<Iterable<Tuple>> collection = super.createCollection();
185
186        Collections.addAll( collection, array ); // init with nulls
187
188        return collection;
189        }
190      };
191    }
192
193  @Override
194  protected HadoopCoGroupClosure createClosure()
195    {
196    return new TezCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields );
197    }
198
199  @Override
200  protected Tuple unwrapGrouping( Tuple key )
201    {
202    return key;
203    }
204
205  }