001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.stream;
022    
023    import java.util.ArrayList;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.Comparator;
027    import java.util.HashMap;
028    import java.util.IdentityHashMap;
029    import java.util.Map;
030    import java.util.Set;
031    import java.util.TreeSet;
032    import java.util.concurrent.atomic.AtomicInteger;
033    
034    import cascading.flow.FlowProcess;
035    import cascading.flow.FlowProps;
036    import cascading.flow.planner.Scope;
037    import cascading.pipe.Splice;
038    import cascading.tuple.Fields;
039    import cascading.tuple.Tuple;
040    import cascading.tuple.util.TupleBuilder;
041    import cascading.tuple.util.TupleHasher;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    import static cascading.tuple.util.TupleViews.createNarrow;
046    
047    /**
048     *
049     */
050    public abstract class MemorySpliceGate extends SpliceGate
051      {
052      private static final Logger LOG = LoggerFactory.getLogger( MemorySpliceGate.class );
053    
054      protected final Map<Duct, Integer> posMap = new IdentityHashMap<Duct, Integer>();
055    
056      protected Comparator<Tuple>[] groupComparators;
057      protected Comparator<Tuple>[] valueComparators;
058      protected TupleHasher groupHasher;
059      protected boolean nullsAreNotEqual;
060    
061      protected Set<Tuple> keys;
062      protected Map<Tuple, Collection<Tuple>>[] keyValues;
063    
064      protected MemoryCoGroupClosure closure;
065    
066      protected int numIncomingPaths;
067    
068      protected final AtomicInteger count = new AtomicInteger( 0 );
069    
070      public MemorySpliceGate( FlowProcess flowProcess, Splice splice )
071        {
072        super( flowProcess, splice );
073        }
074    
075      @Override
076      public void bind( StreamGraph streamGraph )
077        {
078        super.bind( streamGraph ); // finds allPrevious
079    
080        numIncomingPaths = streamGraph.countAllEventingPathsTo( this );
081    
082        orderDucts( streamGraph );
083        }
084    
085      // we must make a new Tuple instance to wrap the incoming copy
086      protected TupleBuilder createDefaultNarrowBuilder( final Fields incomingFields, final Fields narrowFields )
087        {
088        return new TupleBuilder()
089        {
090        int[] pos = incomingFields.getPos( narrowFields );
091    
092        @Override
093        public Tuple makeResult( Tuple input, Tuple output )
094          {
095          return createNarrow( pos, input );
096          }
097        };
098        }
099    
100      @Override
101      public void initialize()
102        {
103        super.initialize();
104    
105        Comparator defaultComparator = (Comparator) flowProcess.newInstance( (String) flowProcess.getProperty( FlowProps.DEFAULT_ELEMENT_COMPARATOR ) );
106    
107        Fields[] compareFields = new Fields[ orderedPrevious.length ];
108        groupComparators = new Comparator[ orderedPrevious.length ];
109    
110        if( splice.isSorted() )
111          valueComparators = new Comparator[ orderedPrevious.length ];
112    
113        int size = splice.isGroupBy() ? 1 : incomingScopes.size();
114    
115        for( int i = 0; i < size; i++ )
116          {
117          Scope incomingScope = incomingScopes.get( i );
118    
119          int pos = splice.isGroupBy() ? 0 : splice.getPipePos().get( incomingScope.getName() );
120    
121          // we want the comparators
122          Fields groupFields = splice.getKeySelectors().get( incomingScope.getName() );
123    
124          compareFields[ pos ] = groupFields; // used for finding hashers
125    
126          if( groupFields.size() == 0 )
127            groupComparators[ pos ] = groupFields;
128          else
129            groupComparators[ pos ] = new SparseTupleComparator( Fields.asDeclaration( groupFields ), defaultComparator );
130    
131          groupComparators[ pos ] = splice.isSortReversed() ? Collections.reverseOrder( groupComparators[ pos ] ) : groupComparators[ pos ];
132    
133          if( sortFields != null )
134            {
135            // we want the comparators, so don't use sortFields array
136            Fields sortFields = splice.getSortingSelectors().get( incomingScope.getName() );
137            valueComparators[ pos ] = new SparseTupleComparator( valuesFields[ pos ], sortFields, defaultComparator );
138    
139            if( splice.isSortReversed() )
140              valueComparators[ pos ] = Collections.reverseOrder( valueComparators[ pos ] );
141            }
142          }
143    
144        nullsAreNotEqual = !areNullsEqual();
145    
146        if( nullsAreNotEqual )
147          LOG.debug( "treating null values in Tuples at not equal during in memory grouping" );
148    
149        Comparator[] hashers = TupleHasher.merge( compareFields );
150        groupHasher = defaultComparator != null || !TupleHasher.isNull( hashers ) ? new TupleHasher( defaultComparator, hashers ) : null;
151    
152        keys = createKeySet();
153    
154        count.set( numIncomingPaths ); // the number of paths incoming
155        }
156    
157      private boolean areNullsEqual()
158        {
159        try
160          {
161          Tuple tupleWithNull = Tuple.size( 1 );
162    
163          return groupComparators[ 0 ].compare( tupleWithNull, tupleWithNull ) == 0;
164          }
165        catch( Exception exception )
166          {
167          return true; // assume we have an npe or something and they don't expect to see nulls
168          }
169        }
170    
171      @Override
172      public void prepare()
173        {
174        super.prepare();
175    
176        keyValues = createKeyValuesArray();
177    
178        makePosMap( posMap );
179    
180        closure = new MemoryCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields );
181    
182        if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() )
183          grouping.joinerClosure = closure;
184        }
185    
186      protected Comparator getKeyComparator()
187        {
188        if( groupComparators.length > 0 && groupComparators[ 0 ] != null )
189          return groupComparators[ 0 ];
190    
191        return new Comparator<Comparable>()
192        {
193        @Override
194        public int compare( Comparable lhs, Comparable rhs )
195          {
196          return lhs.compareTo( rhs );
197          }
198        };
199        }
200    
201      protected Set<Tuple> createKeySet()
202        {
203        return Collections.synchronizedSet( new TreeSet<Tuple>( getKeyComparator() ) );
204        }
205    
206      /**
207       * This lets us just replace an old map and let the gc cleanup, vs clearing each map
208       *
209       * @return of type Map
210       */
211      protected Map<Tuple, Collection<Tuple>>[] createKeyValuesArray()
212        {
213        // Ducts use identity for equality
214        Map<Tuple, Collection<Tuple>>[] valueMap = new Map[ orderedPrevious.length ];
215    
216        int start = isBlockingStreamed() ? 0 : 1;
217        for( int i = start; i < orderedPrevious.length; i++ )
218          {
219          if( orderedPrevious[ i ] == null ) // only true for local mode
220            continue;
221    
222          valueMap[ i ] = createTupleMap();
223          }
224    
225        return valueMap;
226        }
227    
228      protected Map<Tuple, Collection<Tuple>> createTupleMap()
229        {
230        return new HashMap<Tuple, Collection<Tuple>>()
231        {
232        @Override
233        public Collection<Tuple> get( Object object )
234          {
235          Collection<Tuple> value = super.get( object );
236    
237          if( value == null )
238            {
239            value = new ArrayList<Tuple>();
240    
241            super.put( (Tuple) object, value );
242            }
243    
244          return value;
245          }
246        };
247        }
248    
249      /**
250       * This allows the tuple to honor the hasher and comparators, if any
251       *
252       * @param object the tuple to wrap
253       * @return a DelegatedTuple instance
254       */
255      protected final Tuple getDelegatedTuple( Tuple object )
256        {
257        if( groupHasher == null )
258          return object;
259    
260        return new DelegatedTuple( object );
261        }
262    
263      protected abstract boolean isBlockingStreamed();
264    
265      protected class DelegatedTuple extends Tuple
266        {
267        public DelegatedTuple( Tuple wrapped )
268          {
269          // pass it in to prevent one being allocated
270          super( Tuple.elements( wrapped ) );
271          }
272    
273        @Override
274        public boolean equals( Object object )
275          {
276          return compareTo( object ) == 0;
277          }
278    
279        @Override
280        public int compareTo( Object other )
281          {
282          return groupComparators[ 0 ].compare( this, (Tuple) other );
283          }
284    
285        @Override
286        public int hashCode()
287          {
288          return groupHasher.hashCode( this );
289          }
290        }
291      }