001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.local.stream.element;
022
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026
027import cascading.flow.FlowProcess;
028import cascading.flow.stream.duct.Duct;
029import cascading.flow.stream.element.MemorySpliceGate;
030import cascading.pipe.Splice;
031import cascading.tuple.Tuple;
032import cascading.tuple.TupleEntry;
033import com.google.common.collect.ArrayListMultimap;
034import com.google.common.collect.ListMultimap;
035import com.google.common.collect.Multimaps;
036
037/**
038 *
039 */
040public class LocalGroupByGate extends MemorySpliceGate
041  {
042  private ListMultimap<Tuple, Tuple> valueMap;
043
044  public LocalGroupByGate( FlowProcess flowProcess, Splice splice )
045    {
046    super( flowProcess, splice );
047    }
048
049  @Override
050  protected boolean isBlockingStreamed()
051    {
052    return true;
053    }
054
055  private ListMultimap<Tuple, Tuple> initNewValueMap()
056    {
057    return Multimaps.synchronizedListMultimap( ArrayListMultimap.<Tuple, Tuple>create() );
058    }
059
060  @Override
061  public void prepare()
062    {
063    super.prepare();
064
065    valueMap = initNewValueMap();
066    }
067
068  @Override
069  public void start( Duct previous )
070    {
071    // chained below in #complete()
072    }
073
074  @Override
075  public void receive( Duct previous, TupleEntry incomingEntry )
076    {
077    Tuple valuesTuple = incomingEntry.getTupleCopy();
078    Tuple groupTuple = keyBuilder[ 0 ].makeResult( valuesTuple, null ); // view on valuesTuple
079
080    groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored
081
082    keys.add( groupTuple );
083    valueMap.put( groupTuple, valuesTuple );
084    }
085
086  @Override
087  public void complete( Duct previous )
088    {
089    if( count.decrementAndGet() != 0 )
090      return;
091
092    next.start( this );
093
094    // drain the keys and keyValues collections to preserve memory
095    Iterator<Tuple> iterator = keys.iterator();
096
097    // no need to synchronize here as we are guaranteed all writer threads are completed
098    while( iterator.hasNext() )
099      {
100      Tuple groupTuple = iterator.next();
101
102      iterator.remove();
103
104      keyEntry.setTuple( groupTuple );
105
106      List<Tuple> tuples = valueMap.get( groupTuple ); // can't removeAll, returns unmodifiable collection
107
108      if( valueComparators != null )
109        Collections.sort( tuples, valueComparators[ 0 ] );
110
111      tupleEntryIterator.reset( tuples.iterator() );
112
113      next.receive( this, grouping );
114
115      tuples.clear();
116      }
117
118    keys = createKeySet();
119    valueMap = initNewValueMap();
120    count.set( numIncomingEventingPaths );
121
122    next.complete( this );
123    }
124  }