001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.local.stream; 022 023 import java.util.Collections; 024 import java.util.Iterator; 025 import java.util.List; 026 027 import cascading.flow.FlowProcess; 028 import cascading.flow.stream.Duct; 029 import cascading.flow.stream.MemorySpliceGate; 030 import cascading.pipe.Splice; 031 import cascading.tuple.Tuple; 032 import cascading.tuple.TupleEntry; 033 import com.google.common.collect.ArrayListMultimap; 034 import com.google.common.collect.ListMultimap; 035 import com.google.common.collect.Multimaps; 036 037 /** 038 * 039 */ 040 public class LocalGroupByGate extends MemorySpliceGate 041 { 042 private ListMultimap<Tuple, Tuple> valueMap; 043 044 public LocalGroupByGate( FlowProcess flowProcess, Splice splice ) 045 { 046 super( flowProcess, splice ); 047 } 048 049 @Override 050 protected boolean isBlockingStreamed() 051 { 052 return true; 053 } 054 055 private ListMultimap<Tuple, Tuple> initNewValueMap() 056 { 057 return Multimaps.synchronizedListMultimap( ArrayListMultimap.<Tuple, Tuple>create() ); 058 } 059 060 @Override 061 public void prepare() 062 { 063 super.prepare(); 064 065 valueMap = initNewValueMap(); 066 } 067 068 @Override 069 public void start( Duct previous ) 070 { 071 // chained below in #complete() 072 } 073 074 @Override 075 public void receive( Duct previous, TupleEntry incomingEntry ) 076 { 077 Tuple valuesTuple = incomingEntry.getTupleCopy(); 078 Tuple groupTuple = keyBuilder[ 0 ].makeResult( valuesTuple, null ); // view on valuesTuple 079 080 groupTuple = getDelegatedTuple( groupTuple ); // wrap so hasher/comparator is honored 081 082 keys.add( groupTuple ); 083 valueMap.put( groupTuple, valuesTuple ); 084 } 085 086 @Override 087 public void complete( Duct previous ) 088 { 089 if( count.decrementAndGet() != 0 ) 090 return; 091 092 next.start( this ); 093 094 // drain the keys and keyValues collections to preserve memory 095 Iterator<Tuple> iterator = keys.iterator(); 096 097 // no need to synchronize here as we are guaranteed all writer threads are completed 098 while( iterator.hasNext() ) 099 { 100 Tuple groupTuple = iterator.next(); 101 102 iterator.remove(); 103 104 keyEntry.setTuple( groupTuple ); 105 106 List<Tuple> tuples = valueMap.get( groupTuple ); // can't removeAll, returns unmodifiable collection 107 108 if( valueComparators != null ) 109 Collections.sort( tuples, valueComparators[ 0 ] ); 110 111 tupleEntryIterator.reset( tuples.iterator() ); 112 113 next.receive( this, grouping ); 114 115 tuples.clear(); 116 } 117 118 keys = createKeySet(); 119 valueMap = initNewValueMap(); 120 count.set( numIncomingPaths ); 121 122 next.complete( this ); 123 } 124 }