001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.tuple.hadoop.collect; 022 023import java.util.Collection; 024 025import cascading.flow.FlowProcess; 026import cascading.provider.FactoryLoader; 027import cascading.tuple.Tuple; 028import cascading.tuple.collect.Spillable; 029import cascading.tuple.collect.SpillableTupleList; 030import cascading.tuple.collect.SpillableTupleMap; 031import cascading.tuple.collect.TupleCollectionFactory; 032import cascading.tuple.collect.TupleMapFactory; 033import org.apache.hadoop.conf.Configuration; 034 035/** 036 * HadoopSpillableTupleMap is responsible for spilling values to disk if the map threshold is reached. 037 * 038 * @see SpillableTupleMap 039 * @see SpillableTupleList 040 */ 041public class HadoopSpillableTupleMap extends SpillableTupleMap 042 { 043 private final FlowProcess<? extends Configuration> flowProcess; 044 private final Spillable.SpillStrategy spillStrategy; 045 private final TupleCollectionFactory<Configuration> tupleCollectionFactory; 046 047 public HadoopSpillableTupleMap( int initialCapacity, float loadFactor, int mapThreshold, int listThreshold, FlowProcess<? extends Configuration> flowProcess ) 048 { 049 super( initialCapacity, loadFactor, mapThreshold, listThreshold ); 050 this.flowProcess = flowProcess; 051 this.spillStrategy = getSpillStrategy(); 052 053 FactoryLoader loader = FactoryLoader.getInstance(); 054 055 this.tupleCollectionFactory = loader.loadFactoryFrom( flowProcess, TupleMapFactory.TUPLE_MAP_FACTORY, HadoopTupleCollectionFactory.class ); 056 } 057 058 @Override 059 protected Collection<Tuple> createTupleCollection( Tuple tuple ) 060 { 061 Collection<Tuple> collection = tupleCollectionFactory.create( flowProcess ); 062 063 if( collection instanceof Spillable ) 064 { 065 ( (Spillable) collection ).setGrouping( tuple ); 066 ( (Spillable) collection ).setSpillListener( getSpillListener() ); 067 ( (Spillable) collection ).setSpillStrategy( spillStrategy ); 068 } 069 070 return collection; 071 } 072 073 /** 074 * Method getSpillStrategy returns a SpillStrategy instance that is passed to the underlying Spillable 075 * tuple collection. 076 * 077 * @return of type Spillable#SpillStrategy 078 */ 079 protected Spillable.SpillStrategy getSpillStrategy() 080 { 081 return new Spillable.SpillStrategy() 082 { 083 int minThreshold = (int) ( getMapThreshold() * .05 ); 084 085 int current() 086 { 087 return Math.max( minThreshold, Math.min( getInitListThreshold(), getMapThreshold() / size() ) ); 088 } 089 090 @Override 091 public boolean doSpill( Spillable spillable, int size ) 092 { 093 return current() <= size; 094 } 095 096 @Override 097 public String getSpillReason( Spillable spillable ) 098 { 099 return "met current threshold: " + current(); 100 } 101 }; 102 } 103 }