001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.Set;
029
030import cascading.flow.FlowProcess;
031import cascading.flow.stream.duct.Duct;
032import cascading.pipe.Splice;
033import cascading.tuple.Tuple;
034import cascading.tuple.TupleEntry;
035import cascading.tuple.Tuples;
036
037/**
038 *
039 */
040public class MemoryCoGroupGate extends MemorySpliceGate
041  {
042  public MemoryCoGroupGate( FlowProcess flowProcess, Splice splice )
043    {
044    super( flowProcess, splice );
045    }
046
047  @Override
048  protected boolean isBlockingStreamed()
049    {
050    return true;
051    }
052
053  @Override
054  public void start( Duct previous )
055    {
056    }
057
058  @Override
059  public void receive( Duct previous, int ordinal, TupleEntry incomingEntry )
060    {
061    Tuple valuesTuple = incomingEntry.getTupleCopy();
062    Tuple groupTuple = keyBuilder[ ordinal ].makeResult( valuesTuple, null ); // view on valuesTuple
063
064    groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored
065
066    keys.add( groupTuple );
067    keyValues[ ordinal ].get( groupTuple ).add( valuesTuple );
068    }
069
070  @Override
071  public void complete( Duct previous )
072    {
073    if( count.decrementAndGet() != 0 )
074      return;
075
076    next.start( this );
077
078    Collection<Tuple>[] collections = new Collection[ keyValues.length ];
079    Iterator<Tuple> keyIterator = keys.iterator();
080
081    Set<Tuple> seenNulls = new HashSet<Tuple>();
082
083    while( keyIterator.hasNext() )
084      {
085      Tuple keysTuple = keyIterator.next();
086
087      keyIterator.remove();
088
089      // provides sql like semantics
090      if( nullsAreNotEqual && Tuples.frequency( keysTuple, null ) != 0 )
091        {
092        if( seenNulls.contains( keysTuple ) )
093          continue;
094
095        seenNulls.add( keysTuple );
096
097        for( int i = 0; i < keyValues.length; i++ )
098          {
099          Collection<Tuple> values = keyValues[ i ].remove( keysTuple );
100
101          if( values == null )
102            continue;
103
104          for( int j = 0; j < keyValues.length; j++ )
105            collections[ j ] = Collections.emptyList();
106
107          collections[ i ] = values;
108
109          push( collections, keysTuple );
110          }
111        }
112      else
113        {
114        // drain the keys and keyValues collections to preserve memory
115        for( int i = 0; i < keyValues.length; i++ )
116          {
117          collections[ i ] = keyValues[ i ].remove( keysTuple );
118
119          if( collections[ i ] == null )
120            collections[ i ] = Collections.emptyList();
121          }
122
123        push( collections, keysTuple );
124        }
125      }
126
127    keys = createKeySet();
128    keyValues = createKeyValuesArray();
129
130    count.set( numIncomingEventingPaths );
131
132    next.complete( this );
133    }
134
135  private void push( Collection<Tuple>[] collections, Tuple keysTuple )
136    {
137    closure.reset( collections );
138
139    keyEntry.setTuple( closure.getGroupTuple( keysTuple ) );
140
141    // create Closure type here
142    tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
143
144    next.receive( this, 0, grouping );
145    }
146  }