001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop.collect;
022    
023    import java.util.Collection;
024    
025    import cascading.flow.FlowProcess;
026    import cascading.provider.FactoryLoader;
027    import cascading.tuple.Tuple;
028    import cascading.tuple.collect.Spillable;
029    import cascading.tuple.collect.SpillableTupleList;
030    import cascading.tuple.collect.SpillableTupleMap;
031    import cascading.tuple.collect.TupleCollectionFactory;
032    import cascading.tuple.collect.TupleMapFactory;
033    import org.apache.hadoop.mapred.JobConf;
034    
035    /**
036     * HadoopSpillableTupleMap is responsible for spilling values to disk if the map threshold is reached.
037     *
038     * @see SpillableTupleMap
039     * @see SpillableTupleList
040     */
041    public class HadoopSpillableTupleMap extends SpillableTupleMap
042      {
043      private final FlowProcess<JobConf> flowProcess;
044      private final Spillable.SpillStrategy spillStrategy;
045      private final TupleCollectionFactory<JobConf> tupleCollectionFactory;
046    
047      public HadoopSpillableTupleMap( int initialCapacity, float loadFactor, int mapThreshold, int listThreshold, FlowProcess<JobConf> flowProcess )
048        {
049        super( initialCapacity, loadFactor, mapThreshold, listThreshold );
050        this.flowProcess = flowProcess;
051        this.spillStrategy = getSpillStrategy();
052    
053        FactoryLoader loader = FactoryLoader.getInstance();
054    
055        this.tupleCollectionFactory = loader.loadFactoryFrom( flowProcess, TupleMapFactory.TUPLE_MAP_FACTORY, HadoopTupleCollectionFactory.class );
056        }
057    
058      @Override
059      protected Collection<Tuple> createTupleCollection( Tuple tuple )
060        {
061        Collection<Tuple> collection = tupleCollectionFactory.create( flowProcess );
062    
063        if( collection instanceof Spillable )
064          {
065          ( (Spillable) collection ).setGrouping( tuple );
066          ( (Spillable) collection ).setSpillListener( getSpillListener() );
067          ( (Spillable) collection ).setSpillStrategy( spillStrategy );
068          }
069    
070        return collection;
071        }
072    
073      /**
074       * Method getSpillStrategy returns a SpillStrategy instance that is passed to the underlying Spillable
075       * tuple collection.
076       *
077       * @return of type Spillable#SpillStrategy
078       */
079      protected Spillable.SpillStrategy getSpillStrategy()
080        {
081        return new Spillable.SpillStrategy()
082        {
083        int minThreshold = (int) ( getMapThreshold() * .05 );
084    
085        int current()
086          {
087          return Math.max( minThreshold, Math.min( getInitListThreshold(), getMapThreshold() / size() ) );
088          }
089    
090        @Override
091        public boolean doSpill( Spillable spillable, int size )
092          {
093          return current() <= size;
094          }
095    
096        @Override
097        public String getSpillReason( Spillable spillable )
098          {
099          return "met current threshold: " + current();
100          }
101        };
102        }
103      }