001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.Iterator;
026
027import cascading.flow.FlowProcess;
028import cascading.pipe.joiner.JoinerClosure;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.tuple.Tuples;
032import cascading.tuple.util.TupleViews;
033
034/**
035 *
036 */
037public class MemoryCoGroupClosure extends JoinerClosure
038  {
039  private Collection<Tuple>[] collections;
040  private final int numSelfJoins;
041  private final Tuple emptyTuple;
042  private Tuple joinedTuple = new Tuple(); // is discarded
043
044  private Tuple[] joinedTuplesArray;
045  private TupleBuilder joinedBuilder;
046
047  public MemoryCoGroupClosure( FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields )
048    {
049    super( flowProcess, groupingFields, valueFields );
050    this.numSelfJoins = numSelfJoins;
051    this.emptyTuple = Tuple.size( groupingFields[ 0 ].size() );
052
053    this.joinedTuplesArray = new Tuple[ size() ];
054    this.joinedBuilder = makeJoinedBuilder( groupingFields );
055    }
056
057  @Override
058  public int size()
059    {
060    return Math.max( joinFields.length, numSelfJoins + 1 );
061    }
062
063  public void reset( Collection<Tuple>[] collections )
064    {
065    this.collections = collections;
066    }
067
068  @Override
069  public Iterator<Tuple> getIterator( int pos )
070    {
071    if( numSelfJoins != 0 )
072      return collections[ 0 ].iterator();
073    else
074      return collections[ pos ].iterator();
075    }
076
077  @Override
078  public boolean isEmpty( int pos )
079    {
080    if( numSelfJoins != 0 )
081      return collections[ 0 ].isEmpty();
082    else
083      return collections[ pos ].isEmpty();
084    }
085
086  @Override
087  public Tuple getGroupTuple( Tuple keysTuple )
088    {
089    Tuples.asModifiable( joinedTuple );
090
091    for( int i = 0; i < collections.length; i++ )
092      joinedTuplesArray[ i ] = collections[ i ].isEmpty() ? emptyTuple : keysTuple;
093
094    joinedTuple = joinedBuilder.makeResult( joinedTuplesArray );
095
096    return joinedTuple;
097    }
098
099  static interface TupleBuilder
100    {
101    Tuple makeResult( Tuple[] tuples );
102    }
103
104  private TupleBuilder makeJoinedBuilder( final Fields[] joinFields )
105    {
106    final Fields[] fields = isSelfJoin() ? new Fields[ size() ] : joinFields;
107
108    if( isSelfJoin() )
109      Arrays.fill( fields, 0, fields.length, joinFields[ 0 ] );
110
111    return new TupleBuilder()
112    {
113    Tuple result = TupleViews.createComposite( fields );
114
115    @Override
116    public Tuple makeResult( Tuple[] tuples )
117      {
118      return TupleViews.reset( result, tuples );
119      }
120    };
121    }
122  }