001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.stream; 022 023 import java.util.Iterator; 024 025 import cascading.flow.FlowProcess; 026 import cascading.flow.SliceCounters; 027 import cascading.flow.hadoop.HadoopFlowProcess; 028 import cascading.flow.hadoop.HadoopGroupByClosure; 029 import cascading.flow.stream.Duct; 030 import cascading.flow.stream.SpliceGate; 031 import cascading.flow.stream.StreamGraph; 032 import cascading.pipe.Splice; 033 import cascading.pipe.joiner.BufferJoin; 034 import cascading.tap.hadoop.util.MeasuredOutputCollector; 035 import cascading.tuple.Tuple; 036 import org.apache.hadoop.mapred.OutputCollector; 037 038 /** 039 * 040 */ 041 public abstract class HadoopGroupGate extends SpliceGate 042 { 043 protected HadoopGroupByClosure closure; 044 protected OutputCollector collector; 045 046 public HadoopGroupGate( FlowProcess flowProcess, Splice splice, Role role ) 047 { 048 super( flowProcess, splice, role ); 049 } 050 051 @Override 052 public void bind( StreamGraph streamGraph ) 053 { 054 allPrevious = getAllPreviousFor( streamGraph ); 055 056 if( role != Role.sink ) 057 next = getNextFor( streamGraph ); 058 } 059 060 @Override 061 public void prepare() 062 { 063 collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, ( (HadoopFlowProcess) flowProcess ).getOutputCollector() ); 064 } 065 066 @Override 067 public void start( Duct previous ) 068 { 069 if( next != null ) 070 super.start( previous ); 071 } 072 073 @Override 074 public void complete( Duct previous ) 075 { 076 if( next != null ) 077 super.complete( previous ); 078 } 079 080 public void run( Tuple key, Iterator values ) 081 { 082 key = unwrapGrouping( key ); 083 084 closure.reset( key, values ); 085 086 // Buffer is using JoinerClosure directly 087 if( !( splice.getJoiner() instanceof BufferJoin ) ) 088 values = splice.getJoiner().getIterator( closure ); 089 090 keyEntry.setTuple( closure.getGroupTuple( key ) ); 091 tupleEntryIterator.reset( values ); 092 093 next.receive( this, grouping ); 094 } 095 096 protected abstract Tuple unwrapGrouping( Tuple key ); 097 }