001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.util.Arrays;
024    import java.util.Collection;
025    import java.util.Iterator;
026    import java.util.NoSuchElementException;
027    
028    import cascading.flow.FlowProcess;
029    import cascading.flow.hadoop.util.FalseCollection;
030    import cascading.provider.FactoryLoader;
031    import cascading.tuple.Fields;
032    import cascading.tuple.Tuple;
033    import cascading.tuple.Tuples;
034    import cascading.tuple.collect.Spillable;
035    import cascading.tuple.collect.SpillableTupleList;
036    import cascading.tuple.collect.TupleCollectionFactory;
037    import cascading.tuple.hadoop.collect.HadoopTupleCollectionFactory;
038    import cascading.tuple.io.IndexTuple;
039    import cascading.tuple.util.TupleViews;
040    import org.apache.hadoop.mapred.JobConf;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    import static cascading.tuple.collect.TupleCollectionFactory.TUPLE_COLLECTION_FACTORY;
045    
046    /** Class CoGroupClosure is used internally to represent co-grouping results of multiple tuple streams. */
047    public class HadoopCoGroupClosure extends HadoopGroupByClosure
048      {
049      /** Field LOG */
050      private static final Logger LOG = LoggerFactory.getLogger( HadoopCoGroupClosure.class );
051    
052      public enum Spill
053        {
054          Num_Spills_Written, Num_Spills_Read, Num_Tuples_Spilled, Duration_Millis_Written
055        }
056    
057      private class SpillListener implements Spillable.SpillListener
058        {
059        private final FlowProcess flowProcess;
060        private final Fields joinField;
061    
062        public SpillListener( FlowProcess flowProcess, Fields joinField )
063          {
064          this.flowProcess = flowProcess;
065          this.joinField = joinField;
066          }
067    
068        @Override
069        public void notifyWriteSpillBegin( Spillable spillable, int spillSize, String spillReason )
070          {
071          int numFiles = spillable.spillCount();
072    
073          if( numFiles % 10 == 0 )
074            {
075            LOG.info( "spilling group: {}, on grouping: {}, num times: {}, with reason: {}",
076              new Object[]{joinField.printVerbose(), spillable.getGrouping().print(), numFiles + 1, spillReason} );
077    
078            Runtime runtime = Runtime.getRuntime();
079            long freeMem = runtime.freeMemory() / 1024 / 1024;
080            long maxMem = runtime.maxMemory() / 1024 / 1024;
081            long totalMem = runtime.totalMemory() / 1024 / 1024;
082    
083            LOG.info( "mem on spill (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
084            }
085    
086          LOG.info( "spilling {} tuples in list to file number {}", spillSize, numFiles + 1 );
087    
088          flowProcess.increment( Spill.Num_Spills_Written, 1 );
089          flowProcess.increment( Spill.Num_Tuples_Spilled, spillSize );
090          }
091    
092        @Override
093        public void notifyWriteSpillEnd( SpillableTupleList spillableTupleList, long duration )
094          {
095          flowProcess.increment( Spill.Duration_Millis_Written, duration );
096          }
097    
098        @Override
099        public void notifyReadSpillBegin( Spillable spillable )
100          {
101          flowProcess.increment( Spill.Num_Spills_Read, 1 );
102          }
103        }
104    
105      /** Field groups */
106      Collection<Tuple>[] collections;
107      private final int numSelfJoins;
108    
109      private Tuple[] joinedTuplesArray;
110      private final Tuple emptyTuple;
111      private TupleBuilder joinedBuilder;
112      private Tuple joinedTuple = new Tuple(); // is discarded
113    
114      private final TupleCollectionFactory<JobConf> tupleCollectionFactory;
115    
116      public HadoopCoGroupClosure( FlowProcess flowProcess, int numSelfJoins, Fields[] groupingFields, Fields[] valueFields )
117        {
118        super( flowProcess, groupingFields, valueFields );
119        this.numSelfJoins = numSelfJoins;
120    
121        this.emptyTuple = Tuple.size( groupingFields[ 0 ].size() );
122    
123        FactoryLoader loader = FactoryLoader.getInstance();
124    
125        this.tupleCollectionFactory = loader.loadFactoryFrom( flowProcess, TUPLE_COLLECTION_FACTORY, HadoopTupleCollectionFactory.class );
126    
127        initLists();
128        }
129    
130      @Override
131      public int size()
132        {
133        return Math.max( joinFields.length, numSelfJoins + 1 );
134        }
135    
136      @Override
137      public Iterator<Tuple> getIterator( int pos )
138        {
139        if( pos < 0 || pos >= collections.length )
140          throw new IllegalArgumentException( "invalid group position: " + pos );
141    
142        return makeIterator( pos, collections[ pos ].iterator() );
143        }
144    
145      @Override
146      public Tuple getGroupTuple( Tuple keysTuple )
147        {
148        Tuples.asModifiable( joinedTuple );
149    
150        for( int i = 0; i < collections.length; i++ )
151          joinedTuplesArray[ i ] = collections[ i ].isEmpty() ? emptyTuple : keysTuple;
152    
153        joinedTuple = joinedBuilder.makeResult( joinedTuplesArray );
154    
155        return joinedTuple;
156        }
157    
158      @Override
159      public boolean isEmpty( int pos )
160        {
161        return collections[ pos ].isEmpty();
162        }
163    
164      @Override
165      public void reset( Tuple grouping, Iterator values )
166        {
167        super.reset( grouping, values );
168    
169        build();
170        }
171    
172      private void build()
173        {
174        clearGroups();
175    
176        if( collections[ 0 ] instanceof FalseCollection ) // force reset on FalseCollection
177          ( (FalseCollection) collections[ 0 ] ).setIterator( null );
178    
179        while( values.hasNext() )
180          {
181          IndexTuple current = (IndexTuple) values.next();
182          int pos = current.getIndex();
183    
184          // if this is the first (lhs) co-group, just use values iterator
185          // we are guaranteed all the remainder tuples in the iterator are from pos == 0
186          if( numSelfJoins == 0 && pos == 0 )
187            {
188            ( (FalseCollection) collections[ 0 ] ).setIterator( createIterator( current, values ) );
189            break;
190            }
191    
192          collections[ pos ].add( current.getTuple() ); // get the value tuple for this cogroup
193          }
194        }
195    
196      private void clearGroups()
197        {
198        for( Collection<Tuple> collection : collections )
199          {
200          collection.clear();
201    
202          if( collection instanceof Spillable )
203            ( (Spillable) collection ).setGrouping( grouping );
204          }
205        }
206    
207      private void initLists()
208        {
209        collections = new Collection[ size() ];
210    
211        // handle self joins
212        if( numSelfJoins != 0 )
213          {
214          Arrays.fill( collections, createTupleCollection( joinFields[ 0 ] ) );
215          }
216        else
217          {
218          collections[ 0 ] = new FalseCollection(); // we iterate this only once per grouping
219    
220          for( int i = 1; i < joinFields.length; i++ )
221            collections[ i ] = createTupleCollection( joinFields[ i ] );
222          }
223    
224        joinedBuilder = makeJoinedBuilder( joinFields );
225        joinedTuplesArray = new Tuple[ collections.length ];
226        }
227    
228      static interface TupleBuilder
229        {
230        Tuple makeResult( Tuple[] tuples );
231        }
232    
233      private TupleBuilder makeJoinedBuilder( final Fields[] joinFields )
234        {
235        final Fields[] fields = isSelfJoin() ? new Fields[ size() ] : joinFields;
236    
237        if( isSelfJoin() )
238          Arrays.fill( fields, 0, fields.length, joinFields[ 0 ] );
239    
240        return new TupleBuilder()
241        {
242        Tuple result = TupleViews.createComposite( fields );
243    
244        @Override
245        public Tuple makeResult( Tuple[] tuples )
246          {
247          return TupleViews.reset( result, tuples );
248          }
249        };
250        }
251    
252      private Collection<Tuple> createTupleCollection( Fields joinField )
253        {
254        Collection<Tuple> collection = tupleCollectionFactory.create( flowProcess );
255    
256        if( collection instanceof Spillable )
257          ( (Spillable) collection ).setSpillListener( createListener( joinField ) );
258    
259        return collection;
260        }
261    
262      private Spillable.SpillListener createListener( final Fields joinField )
263        {
264        return new SpillListener( flowProcess, joinField );
265        }
266    
267      public Iterator<Tuple> createIterator( final IndexTuple current, final Iterator<IndexTuple> values )
268        {
269        return new Iterator<Tuple>()
270        {
271        IndexTuple value = current;
272    
273        @Override
274        public boolean hasNext()
275          {
276          return value != null;
277          }
278    
279        @Override
280        public Tuple next()
281          {
282          if( value == null && !values.hasNext() )
283            throw new NoSuchElementException();
284    
285          Tuple result = value.getTuple();
286    
287          if( values.hasNext() )
288            value = values.next();
289          else
290            value = null;
291    
292          return result;
293          }
294    
295        @Override
296        public void remove()
297          {
298          // unsupported
299          }
300        };
301        }
302      }