001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.util.ArrayList; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.Comparator; 027 import java.util.HashMap; 028 import java.util.IdentityHashMap; 029 import java.util.Map; 030 import java.util.Set; 031 import java.util.TreeSet; 032 import java.util.concurrent.atomic.AtomicInteger; 033 034 import cascading.flow.FlowProcess; 035 import cascading.flow.FlowProps; 036 import cascading.flow.planner.Scope; 037 import cascading.pipe.Splice; 038 import cascading.tuple.Fields; 039 import cascading.tuple.Tuple; 040 import cascading.tuple.util.TupleBuilder; 041 import cascading.tuple.util.TupleHasher; 042 import org.slf4j.Logger; 043 import org.slf4j.LoggerFactory; 044 045 import static cascading.tuple.util.TupleViews.createNarrow; 046 047 /** 048 * 049 */ 050 public abstract class MemorySpliceGate extends SpliceGate 051 { 052 private static final Logger LOG = LoggerFactory.getLogger( MemorySpliceGate.class ); 053 054 protected final Map<Duct, Integer> posMap = new IdentityHashMap<Duct, Integer>(); 055 056 protected Comparator<Tuple>[] groupComparators; 057 protected Comparator<Tuple>[] valueComparators; 058 protected TupleHasher groupHasher; 059 protected boolean nullsAreNotEqual; 060 061 protected Set<Tuple> keys; 062 protected Map<Tuple, Collection<Tuple>>[] keyValues; 063 064 protected MemoryCoGroupClosure closure; 065 066 protected int numIncomingPaths; 067 068 protected final AtomicInteger count = new AtomicInteger( 0 ); 069 070 public MemorySpliceGate( FlowProcess flowProcess, Splice splice ) 071 { 072 super( flowProcess, splice ); 073 } 074 075 @Override 076 public void bind( StreamGraph streamGraph ) 077 { 078 super.bind( streamGraph ); // finds allPrevious 079 080 numIncomingPaths = streamGraph.countAllEventingPathsTo( this ); 081 082 orderDucts( streamGraph ); 083 } 084 085 // we must make a new Tuple instance to wrap the incoming copy 086 protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields ) 087 { 088 return new TupleBuilder() 089 { 090 int[] pos = incomingFields.getPos( narrowFields ); 091 092 @Override 093 public Tuple makeResult( Tuple input, Tuple output ) 094 { 095 return createNarrow( pos, input ); 096 } 097 }; 098 } 099 100 @Override 101 public void initialize() 102 { 103 super.initialize(); 104 105 Comparator defaultComparator = (Comparator) flowProcess.newInstance( (String) flowProcess.getProperty( FlowProps.DEFAULT_ELEMENT_COMPARATOR ) ); 106 107 Fields[] compareFields = new Fields[ orderedPrevious.length ]; 108 groupComparators = new Comparator[ orderedPrevious.length ]; 109 110 if( splice.isSorted() ) 111 valueComparators = new Comparator[ orderedPrevious.length ]; 112 113 int size = splice.isGroupBy() ? 1 : incomingScopes.size(); 114 115 for( int i = 0; i < size; i++ ) 116 { 117 Scope incomingScope = incomingScopes.get( i ); 118 119 int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() ); 120 121 // we want the comparators 122 Fields groupFields = splice.getKeySelectors().get( incomingScope.getName() ); 123 124 compareFields[ pos ] = groupFields; // used for finding hashers 125 126 if( groupFields.size() == 0 ) 127 groupComparators[ pos ] = groupFields; 128 else 129 groupComparators[ pos ] = new SparseTupleComparator( Fields.asDeclaration( groupFields ), defaultComparator ); 130 131 groupComparators[ pos ] = splice.isSortReversed() ? Collections.reverseOrder( groupComparators[ pos ] ) : groupComparators[ pos ]; 132 133 if( sortFields != null ) 134 { 135 // we want the comparators, so don't use sortFields array 136 Fields sortFields = splice.getSortingSelectors().get( incomingScope.getName() ); 137 valueComparators[ pos ] = new SparseTupleComparator( valuesFields[ pos ], sortFields, defaultComparator ); 138 139 if( splice.isSortReversed() ) 140 valueComparators[ pos ] = Collections.reverseOrder( valueComparators[ pos ] ); 141 } 142 } 143 144 nullsAreNotEqual = !areNullsEqual(); 145 146 if( nullsAreNotEqual ) 147 LOG.debug( "treating null values in Tuples at not equal during in memory grouping" ); 148 149 Comparator[] hashers = TupleHasher.merge( compareFields ); 150 groupHasher = defaultComparator != null || !TupleHasher.isNull( hashers ) ? new TupleHasher( defaultComparator, hashers ) : null; 151 152 keys = createKeySet(); 153 154 count.set( numIncomingPaths ); // the number of paths incoming 155 } 156 157 private boolean areNullsEqual() 158 { 159 try 160 { 161 Tuple tupleWithNull = Tuple.size( 1 ); 162 163 return groupComparators[ 0 ].compare( tupleWithNull, tupleWithNull ) == 0; 164 } 165 catch( Exception exception ) 166 { 167 return true; // assume we have an npe or something and they don't expect to see nulls 168 } 169 } 170 171 @Override 172 public void prepare() 173 { 174 super.prepare(); 175 176 keyValues = createKeyValuesArray(); 177 178 makePosMap( posMap ); 179 180 closure = new MemoryCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields ); 181 182 if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() ) 183 grouping.joinerClosure = closure; 184 } 185 186 protected Comparator getKeyComparator() 187 { 188 if( groupComparators.length > 0 && groupComparators[ 0 ] != null ) 189 return groupComparators[ 0 ]; 190 191 return new Comparator<Comparable>() 192 { 193 @Override 194 public int compare( Comparable lhs, Comparable rhs ) 195 { 196 return lhs.compareTo( rhs ); 197 } 198 }; 199 } 200 201 protected Set<Tuple> createKeySet() 202 { 203 return Collections.synchronizedSet( new TreeSet<Tuple>( getKeyComparator() ) ); 204 } 205 206 /** 207 * This lets us just replace an old map and let the gc cleanup, vs clearing each map 208 * 209 * @return of type Map 210 */ 211 protected Map<Tuple, Collection<Tuple>>[] createKeyValuesArray() 212 { 213 // Ducts use identity for equality 214 Map<Tuple, Collection<Tuple>>[] valueMap = new Map[ orderedPrevious.length ]; 215 216 int start = isBlockingStreamed() ? 0 : 1; 217 for( int i = start; i < orderedPrevious.length; i++ ) 218 { 219 if( orderedPrevious[ i ] == null ) // only true for local mode 220 continue; 221 222 valueMap[ i ] = createTupleMap(); 223 } 224 225 return valueMap; 226 } 227 228 protected Map<Tuple, Collection<Tuple>> createTupleMap() 229 { 230 return new HashMap<Tuple, Collection<Tuple>>() 231 { 232 @Override 233 public Collection<Tuple> get( Object object ) 234 { 235 Collection<Tuple> value = super.get( object ); 236 237 if( value == null ) 238 { 239 value = new ArrayList<Tuple>(); 240 241 super.put( (Tuple) object, value ); 242 } 243 244 return value; 245 } 246 }; 247 } 248 249 /** 250 * This allows the tuple to honor the hasher and comparators, if any 251 * 252 * @param object the tuple to wrap 253 * @return a DelegatedTuple instance 254 */ 255 protected final Tuple getDelegatedTuple( Tuple object ) 256 { 257 if( groupHasher == null ) 258 return object; 259 260 return new DelegatedTuple( object ); 261 } 262 263 protected abstract boolean isBlockingStreamed(); 264 265 protected class DelegatedTuple extends Tuple 266 { 267 public DelegatedTuple( Tuple wrapped ) 268 { 269 // pass it in to prevent one being allocated 270 super( Tuple.elements( wrapped ) ); 271 } 272 273 @Override 274 public boolean equals( Object object ) 275 { 276 return compareTo( object ) == 0; 277 } 278 279 @Override 280 public int compareTo( Object other ) 281 { 282 return groupComparators[ 0 ].compare( this, (Tuple) other ); 283 } 284 285 @Override 286 public int hashCode() 287 { 288 return groupHasher.hashCode( this ); 289 } 290 } 291 }