001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.util.Iterator;
024
025import cascading.flow.FlowProcess;
026import cascading.pipe.joiner.JoinerClosure;
027import cascading.tuple.Fields;
028import cascading.tuple.Tuple;
029import cascading.tuple.util.TupleBuilder;
030import cascading.tuple.util.TupleViews;
031
032/** Class GroupClosure is used internally to represent groups of tuples during grouping. */
033public class HadoopGroupByClosure extends JoinerClosure
034  {
035  protected Tuple grouping;
036  protected Iterator[] values;
037
038  public HadoopGroupByClosure( FlowProcess flowProcess, Fields[] groupingFields, Fields[] valueFields )
039    {
040    super( flowProcess, groupingFields, valueFields );
041    }
042
043  public Tuple getGrouping()
044    {
045    return grouping;
046    }
047
048  public int size()
049    {
050    return 1;
051    }
052
053  protected Iterator getValueIterator( int pos )
054    {
055    return values[ pos ];
056    }
057
058  @Override
059  public Iterator<Tuple> getIterator( int pos )
060    {
061    if( pos != 0 )
062      throw new IllegalArgumentException( "invalid group position: " + pos );
063
064    return makeIterator( 0, getValueIterator( 0 ) );
065    }
066
067  @Override
068  public boolean isEmpty( int pos )
069    {
070    return values != null;
071    }
072
073  protected Iterator<Tuple> makeIterator( final int pos, final Iterator values )
074    {
075    return new Iterator<Tuple>()
076      {
077      final int cleanPos = valueFields.length == 1 ? 0 : pos; // support repeated pipes
078      TupleBuilder[] valueBuilder = new TupleBuilder[ valueFields.length ];
079
080      {
081      for( int i = 0; i < valueFields.length; i++ )
082        valueBuilder[ i ] = makeBuilder( valueFields[ i ], joinFields[ i ] );
083      }
084
085      private TupleBuilder makeBuilder( final Fields valueField, final Fields joinField )
086        {
087        if( valueField.isUnknown() && joinField.hasRelativePos() )
088          return new TupleBuilder()
089            {
090            @Override
091            public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
092              {
093              Fields fields = joinFields[ cleanPos ];
094
095              fields = Fields.size( valueTuple.size() ).select( fields );
096
097              valueTuple.set( valueFields[ cleanPos ], fields, groupTuple );
098
099              return valueTuple;
100              }
101            };
102
103        if( valueField.isUnknown() || joinField.isNone() )
104          return new TupleBuilder()
105            {
106            @Override
107            public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
108              {
109              valueTuple.set( valueFields[ cleanPos ], joinFields[ cleanPos ], groupTuple );
110
111              return valueTuple;
112              }
113            };
114
115        return new TupleBuilder()
116          {
117          Tuple result = TupleViews.createOverride( valueField, joinField );
118
119          @Override
120          public Tuple makeResult( Tuple valueTuple, Tuple groupTuple )
121            {
122            return TupleViews.reset( result, valueTuple, groupTuple );
123            }
124          };
125        }
126
127      public boolean hasNext()
128        {
129        return values.hasNext();
130        }
131
132      public Tuple next()
133        {
134        Tuple tuple = (Tuple) values.next();
135
136        return valueBuilder[ cleanPos ].makeResult( tuple, grouping );
137        }
138
139      public void remove()
140        {
141        throw new UnsupportedOperationException( "remove not supported" );
142        }
143      };
144    }
145
146  public void reset( Tuple grouping, Iterator<Tuple>[] values )
147    {
148    this.grouping = grouping;
149    this.values = values;
150    }
151
152  @Override
153  public Tuple getGroupTuple( Tuple keysTuple )
154    {
155    return keysTuple;
156    }
157  }