001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.hadoop.stream;
023
024import java.util.Iterator;
025
026import cascading.CascadingException;
027import cascading.flow.FlowProcess;
028import cascading.flow.SliceCounters;
029import cascading.flow.hadoop.HadoopGroupByClosure;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.duct.DuctException;
032import cascading.flow.stream.element.GroupingSpliceGate;
033import cascading.flow.stream.graph.IORole;
034import cascading.flow.stream.graph.StreamGraph;
035import cascading.pipe.Splice;
036import cascading.pipe.joiner.BufferJoin;
037import cascading.tap.hadoop.util.MeasuredOutputCollector;
038import cascading.tuple.Tuple;
039import cascading.tuple.TupleEntry;
040import org.apache.hadoop.mapred.OutputCollector;
041
042/**
043 *
044 */
045public abstract class HadoopGroupGate extends GroupingSpliceGate
046  {
047  protected HadoopGroupByClosure closure;
048  protected OutputCollector collector;
049
050  private final boolean isBufferJoin;
051
052  public HadoopGroupGate( FlowProcess flowProcess, Splice splice, IORole role )
053    {
054    super( flowProcess, splice, role );
055
056    isBufferJoin = splice.getJoiner() instanceof BufferJoin;
057    }
058
059  @Override
060  public void bind( StreamGraph streamGraph )
061    {
062    if( role != IORole.sink )
063      next = getNextFor( streamGraph );
064    }
065
066  @Override
067  public void prepare()
068    {
069    if( role != IORole.source )
070      collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() );
071
072    if( role != IORole.sink )
073      closure = createClosure();
074
075    if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() )
076      grouping.joinerClosure = closure;
077    }
078
079  protected abstract OutputCollector createOutputCollector();
080
081  @Override
082  public void start( Duct previous )
083    {
084    if( next != null )
085      super.start( previous );
086    }
087
088  // todo: receive should receive the edge or ordinal so no lookup
089  public void receive( Duct previous, int ordinal, TupleEntry incomingEntry )
090    {
091    // create a view over the incoming tuple
092    Tuple groupTupleView = keyBuilder[ ordinal ].makeResult( incomingEntry.getTuple(), null );
093
094    // reset keyTuple via groupTuple or groupSortTuple
095    if( sortFields == null )
096      groupTuple.reset( groupTupleView );
097    else
098      groupSortTuple.reset( groupTupleView, sortBuilder[ ordinal ].makeResult( incomingEntry.getTuple(), null ) );
099
100    valueTuple.reset( valuesBuilder[ ordinal ].makeResult( incomingEntry.getTuple(), null ) );
101
102    try
103      {
104      // keyTuple is a reference to either groupTuple or groupSortTuple
105      wrapGroupingAndCollect( previous, ordinal, (Tuple) valueTuple, keyTuple );
106      flowProcess.increment( SliceCounters.Tuples_Written, 1 );
107      }
108    catch( OutOfMemoryError error )
109      {
110      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
111      }
112    catch( CascadingException exception )
113      {
114      handleException( exception, incomingEntry );
115      }
116    catch( Throwable throwable )
117      {
118      handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry );
119      }
120    }
121
122  @Override
123  public void complete( Duct previous )
124    {
125    if( next != null )
126      super.complete( previous );
127    }
128
129  public void accept( Tuple key, Iterator<Tuple>[] values )
130    {
131    key = unwrapGrouping( key );
132
133    closure.reset( key, values );
134
135    // Buffer is using JoinerClosure directly
136    if( !isBufferJoin )
137      tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
138    else
139      tupleEntryIterator.reset( values );
140
141    keyEntry.setTuple( closure.getGroupTuple( key ) );
142
143    next.receive( this, 0, grouping );
144    }
145
146  protected abstract HadoopGroupByClosure createClosure();
147
148  protected abstract void wrapGroupingAndCollect( Duct previous, int ordinal, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException;
149
150  protected abstract Tuple unwrapGrouping( Tuple key );
151  }