001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.stream;
022
023import java.util.Iterator;
024
025import cascading.CascadingException;
026import cascading.flow.FlowProcess;
027import cascading.flow.SliceCounters;
028import cascading.flow.hadoop.HadoopGroupByClosure;
029import cascading.flow.stream.duct.Duct;
030import cascading.flow.stream.duct.DuctException;
031import cascading.flow.stream.element.GroupingSpliceGate;
032import cascading.flow.stream.graph.IORole;
033import cascading.flow.stream.graph.StreamGraph;
034import cascading.pipe.Splice;
035import cascading.pipe.joiner.BufferJoin;
036import cascading.tap.hadoop.util.MeasuredOutputCollector;
037import cascading.tuple.Tuple;
038import cascading.tuple.TupleEntry;
039import org.apache.hadoop.mapred.OutputCollector;
040
041/**
042 *
043 */
044public abstract class HadoopGroupGate extends GroupingSpliceGate
045  {
046  protected HadoopGroupByClosure closure;
047  protected OutputCollector collector;
048
049  private final boolean isBufferJoin;
050
051  public HadoopGroupGate( FlowProcess flowProcess, Splice splice, IORole role )
052    {
053    super( flowProcess, splice, role );
054
055    isBufferJoin = splice.getJoiner() instanceof BufferJoin;
056    }
057
058  @Override
059  public void bind( StreamGraph streamGraph )
060    {
061    if( role != IORole.sink )
062      next = getNextFor( streamGraph );
063
064    if( role == IORole.sink )
065      setOrdinalMap( streamGraph );
066    }
067
068  @Override
069  public void prepare()
070    {
071    if( role != IORole.source )
072      collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() );
073
074    if( role != IORole.sink )
075      closure = createClosure();
076
077    if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() )
078      grouping.joinerClosure = closure;
079    }
080
081  protected abstract OutputCollector createOutputCollector();
082
083  @Override
084  public void start( Duct previous )
085    {
086    if( next != null )
087      super.start( previous );
088    }
089
090  // todo: receive should receive the edge or ordinal so no lookup
091  public void receive( Duct previous, TupleEntry incomingEntry )
092    {
093    Integer pos = ordinalMap.get( previous );
094
095    // create a view over the incoming tuple
096    Tuple groupTupleView = keyBuilder[ pos ].makeResult( incomingEntry.getTuple(), null );
097
098    // reset keyTuple via groupTuple or groupSortTuple
099    if( sortFields == null )
100      groupTuple.reset( groupTupleView );
101    else
102      groupSortTuple.reset( groupTupleView, sortBuilder[ pos ].makeResult( incomingEntry.getTuple(), null ) );
103
104    valueTuple.reset( valuesBuilder[ pos ].makeResult( incomingEntry.getTuple(), null ) );
105
106    try
107      {
108      // keyTuple is a reference to either groupTuple or groupSortTuple
109      wrapGroupingAndCollect( previous, (Tuple) valueTuple, keyTuple );
110      flowProcess.increment( SliceCounters.Tuples_Written, 1 );
111      }
112    catch( OutOfMemoryError error )
113      {
114      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
115      }
116    catch( CascadingException exception )
117      {
118      handleException( exception, incomingEntry );
119      }
120    catch( Throwable throwable )
121      {
122      handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry );
123      }
124    }
125
126  @Override
127  public void complete( Duct previous )
128    {
129    if( next != null )
130      super.complete( previous );
131    }
132
133  public void accept( Tuple key, Iterator<Tuple>[] values )
134    {
135    key = unwrapGrouping( key );
136
137    closure.reset( key, values );
138
139    // Buffer is using JoinerClosure directly
140    if( !isBufferJoin )
141      tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
142    else
143      tupleEntryIterator.reset( values );
144
145    keyEntry.setTuple( closure.getGroupTuple( key ) );
146
147    next.receive( this, grouping );
148    }
149
150  protected abstract HadoopGroupByClosure createClosure();
151
152  protected abstract void wrapGroupingAndCollect( Duct previous, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException;
153
154  protected abstract Tuple unwrapGrouping( Tuple key );
155  }