001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.util.Iterator;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.pipe.joiner.JoinerClosure;
027    import cascading.tuple.Fields;
028    import cascading.tuple.Tuple;
029    import cascading.tuple.util.TupleBuilder;
030    import cascading.tuple.util.TupleViews;
031    
032    /** Class GroupClosure is used internally to represent groups of tuples during grouping. */
033    public class HadoopGroupByClosure extends JoinerClosure
034      {
035      protected Tuple grouping;
036      protected Iterator values;
037    
038      public HadoopGroupByClosure( FlowProcess flowProcess, Fields[] groupingFields, Fields[] valueFields )
039        {
040        super( flowProcess, groupingFields, valueFields );
041        }
042    
043      public Tuple getGrouping()
044        {
045        return grouping;
046        }
047    
048      public int size()
049        {
050        return 1;
051        }
052    
053      @Override
054      public Iterator getIterator( int pos )
055        {
056        if( pos != 0 )
057          throw new IllegalArgumentException( "invalid group position: " + pos );
058    
059        return makeIterator( 0, values );
060        }
061    
062      @Override
063      public boolean isEmpty( int pos )
064        {
065        return values != null;
066        }
067    
068      protected Iterator<Tuple> makeIterator( final int pos, final Iterator values )
069        {
070        return new Iterator<Tuple>()
071        {
072        final int cleanPos = valueFields.length == 1 ? 0 : pos; // support repeated pipes
073        TupleBuilder[] valueBuilder = new TupleBuilder[ valueFields.length ];
074    
075        {
076        for( int i = 0; i < valueFields.length; i++ )
077          valueBuilder[ i ] = makeBuilder( valueFields[ i ], joinFields[ i ] );
078        }
079    
080        private TupleBuilder makeBuilder( final Fields valueField, final Fields joinField )
081          {
082          if( valueField.isUnknown() || joinField.isNone() )
083            return new TupleBuilder()
084            {
085            @Override
086            public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
087              {
088              valueTuple.set( valueFields[ cleanPos ], joinFields[ cleanPos ], groupTuple );
089    
090              return valueTuple;
091              }
092            };
093    
094          return new TupleBuilder()
095          {
096          Tuple result = TupleViews.createOverride( valueField, joinField );
097    
098          @Override
099          public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
100            {
101            return TupleViews.reset( result, valueTuple, groupTuple );
102            }
103          };
104          }
105    
106        public boolean hasNext()
107          {
108          return values.hasNext();
109          }
110    
111        public Tuple next()
112          {
113          Tuple tuple = (Tuple) values.next();
114    
115          return valueBuilder[ cleanPos ].makeResult( tuple, grouping );
116          }
117    
118        public void remove()
119          {
120          throw new UnsupportedOperationException( "remove not supported" );
121          }
122        };
123        }
124    
125      public void reset( Tuple grouping, Iterator values )
126        {
127        this.grouping = grouping;
128        this.values = values;
129        }
130    
131      @Override
132      public Tuple getGroupTuple( Tuple keysTuple )
133        {
134        return keysTuple;
135        }
136      }