001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.util.Iterator; 024 025import cascading.flow.FlowProcess; 026import cascading.pipe.joiner.JoinerClosure; 027import cascading.tuple.Fields; 028import cascading.tuple.Tuple; 029import cascading.tuple.util.TupleBuilder; 030import cascading.tuple.util.TupleViews; 031 032/** Class GroupClosure is used internally to represent groups of tuples during grouping. */ 033public class HadoopGroupByClosure extends JoinerClosure 034 { 035 protected Tuple grouping; 036 protected Iterator[] values; 037 038 public HadoopGroupByClosure( FlowProcess flowProcess, Fields[] groupingFields, Fields[] valueFields ) 039 { 040 super( flowProcess, groupingFields, valueFields ); 041 } 042 043 public Tuple getGrouping() 044 { 045 return grouping; 046 } 047 048 public int size() 049 { 050 return 1; 051 } 052 053 protected Iterator getValueIterator( int pos ) 054 { 055 return values[ pos ]; 056 } 057 058 @Override 059 public Iterator<Tuple> getIterator( int pos ) 060 { 061 if( pos != 0 ) 062 throw new IllegalArgumentException( "invalid group position: " + pos ); 063 064 return makeIterator( 0, getValueIterator( 0 ) ); 065 } 066 067 @Override 068 public boolean isEmpty( int pos ) 069 { 070 return values != null; 071 } 072 073 protected Iterator<Tuple> makeIterator( final int pos, final Iterator values ) 074 { 075 return new Iterator<Tuple>() 076 { 077 final int cleanPos = valueFields.length == 1 ? 0 : pos; // support repeated pipes 078 TupleBuilder[] valueBuilder = new TupleBuilder[ valueFields.length ]; 079 080 { 081 for( int i = 0; i < valueFields.length; i++ ) 082 valueBuilder[ i ] = makeBuilder( valueFields[ i ], joinFields[ i ] ); 083 } 084 085 private TupleBuilder makeBuilder( final Fields valueField, final Fields joinField ) 086 { 087 if( valueField.isUnknown() && joinField.hasRelativePos() ) 088 return new TupleBuilder() 089 { 090 @Override 091 public Tuple makeResult( Tuple valueTuple, Tuple groupTuple ) 092 { 093 Fields fields = joinFields[ cleanPos ]; 094 095 fields = Fields.size( valueTuple.size() ).select( fields ); 096 097 valueTuple.set( valueFields[ cleanPos ], fields, groupTuple ); 098 099 return valueTuple; 100 } 101 }; 102 103 if( valueField.isUnknown() || joinField.isNone() ) 104 return new TupleBuilder() 105 { 106 @Override 107 public Tuple makeResult( Tuple valueTuple, Tuple groupTuple ) 108 { 109 valueTuple.set( valueFields[ cleanPos ], joinFields[ cleanPos ], groupTuple ); 110 111 return valueTuple; 112 } 113 }; 114 115 return new TupleBuilder() 116 { 117 Tuple result = TupleViews.createOverride( valueField, joinField ); 118 119 @Override 120 public Tuple makeResult( Tuple valueTuple, Tuple groupTuple ) 121 { 122 return TupleViews.reset( result, valueTuple, groupTuple ); 123 } 124 }; 125 } 126 127 public boolean hasNext() 128 { 129 return values.hasNext(); 130 } 131 132 public Tuple next() 133 { 134 Tuple tuple = (Tuple) values.next(); 135 136 return valueBuilder[ cleanPos ].makeResult( tuple, grouping ); 137 } 138 139 public void remove() 140 { 141 throw new UnsupportedOperationException( "remove not supported" ); 142 } 143 }; 144 } 145 146 public void reset( Tuple grouping, Iterator<Tuple>[] values ) 147 { 148 this.grouping = grouping; 149 this.values = values; 150 } 151 152 @Override 153 public Tuple getGroupTuple( Tuple keysTuple ) 154 { 155 return keysTuple; 156 } 157 }