001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.element;
023
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.Set;
029
030import cascading.flow.FlowProcess;
031import cascading.flow.stream.StopDataNotificationException;
032import cascading.flow.stream.duct.Duct;
033import cascading.pipe.Splice;
034import cascading.tuple.Tuple;
035import cascading.tuple.TupleEntry;
036import cascading.tuple.Tuples;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 *
042 */
043public class MemoryCoGroupGate extends MemorySpliceGate
044  {
045  private static final Logger LOG = LoggerFactory.getLogger( MemoryCoGroupGate.class );
046
047  public MemoryCoGroupGate( FlowProcess flowProcess, Splice splice )
048    {
049    super( flowProcess, splice );
050    }
051
052  @Override
053  protected boolean isBlockingStreamed()
054    {
055    return true;
056    }
057
058  @Override
059  public void start( Duct previous )
060    {
061    }
062
063  @Override
064  public void receive( Duct previous, int ordinal, TupleEntry incomingEntry )
065    {
066    Tuple valuesTuple = incomingEntry.getTupleCopy();
067    Tuple groupTuple = keyBuilder[ ordinal ].makeResult( valuesTuple, null ); // view on valuesTuple
068
069    groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored
070
071    keys.add( groupTuple );
072    keyValues[ ordinal ].get( groupTuple ).add( valuesTuple );
073    }
074
075  @Override
076  public void complete( Duct previous )
077    {
078    if( count.decrementAndGet() != 0 )
079      return;
080
081    next.start( this );
082
083    Collection<Tuple>[] collections = new Collection[ keyValues.length ];
084    Iterator<Tuple> keyIterator = keys.iterator();
085
086    Set<Tuple> seenNulls = new HashSet<Tuple>();
087
088    while( keyIterator.hasNext() )
089      {
090      Tuple keysTuple = keyIterator.next();
091
092      keyIterator.remove();
093
094      // provides sql like semantics
095      if( nullsAreNotEqual && Tuples.frequency( keysTuple, null ) != 0 )
096        {
097        if( seenNulls.contains( keysTuple ) )
098          continue;
099
100        seenNulls.add( keysTuple );
101
102        for( int i = 0; i < keyValues.length; i++ )
103          {
104          Collection<Tuple> values = keyValues[ i ].remove( keysTuple );
105
106          if( values == null )
107            continue;
108
109          for( int j = 0; j < keyValues.length; j++ )
110            collections[ j ] = Collections.emptyList();
111
112          collections[ i ] = values;
113
114          try
115            {
116            push( collections, keysTuple );
117            }
118          catch( StopDataNotificationException exception )
119            {
120            LOG.info( "received stop data notification: {}", exception.getMessage() );
121            break;
122            }
123          }
124        }
125      else
126        {
127        // drain the keys and keyValues collections to preserve memory
128        for( int i = 0; i < keyValues.length; i++ )
129          {
130          collections[ i ] = keyValues[ i ].remove( keysTuple );
131
132          if( collections[ i ] == null )
133            collections[ i ] = Collections.emptyList();
134          }
135
136        try
137          {
138          push( collections, keysTuple );
139          }
140        catch( StopDataNotificationException exception )
141          {
142          LOG.info( "received stop data notification: {}", exception.getMessage() );
143          break;
144          }
145        }
146      }
147
148    keys = createKeySet();
149    keyValues = createKeyValuesArray();
150
151    count.set( numIncomingEventingPaths );
152
153    next.complete( this );
154    }
155
156  private void push( Collection<Tuple>[] collections, Tuple keysTuple )
157    {
158    closure.reset( collections );
159
160    keyEntry.setTuple( closure.getGroupTuple( keysTuple ) );
161
162    // create Closure type here
163    tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
164
165    next.receive( this, 0, grouping );
166    }
167  }