001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.tuple.collect; 023 024import java.util.Collection; 025import java.util.HashMap; 026 027import cascading.flow.FlowProcess; 028import cascading.tuple.Tuple; 029 030import static cascading.tuple.collect.SpillableProps.defaultMapInitialCapacity; 031import static cascading.tuple.collect.SpillableProps.defaultMapLoadFactor; 032 033/** 034 * SpillableTupleMap is a HashMap that will allow for multiple values per key, and if the number of values for a given 035 * key reach a specific threshold, they will be spilled to disk using a {@link SpillableTupleList} instance. Only 036 * values are spilled, keys are not spilled and too many keys can result in a {@link OutOfMemoryError}. 037 * <p> 038 * The {@link cascading.tuple.collect.SpillableProps#MAP_THRESHOLD} value sets the number of total values that this map will 039 * strive to keep in memory regardless of the number of keys. This is achieved by dynamically calculating the threshold 040 * used by each child SpillableTupleList instance using 041 * {@code threshold = Min( list_threshold, map_threshold / current_num_keys ) }. 042 * <p> 043 * To set the list threshold, see {@link cascading.tuple.collect.SpillableProps} fluent helper class. 044 * <p> 045 * This class is used by the {@link cascading.pipe.HashJoin} pipe, to set properties specific to a given 046 * join instance, see the {@link cascading.pipe.HashJoin#getConfigDef()} method. 047 * 048 * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleMap 049 */ 050public abstract class SpillableTupleMap extends HashMap<Tuple, Collection<Tuple>> implements Spillable 051 { 052 private int mapThreshold; 053 private int initListThreshold; 054 private Spillable.SpillListener spillListener = Spillable.SpillListener.NULL; 055 056 public static int getMapThreshold( FlowProcess flowProcess, int defaultValue ) 057 { 058 String value = (String) flowProcess.getProperty( SpillableProps.MAP_THRESHOLD ); 059 060 if( value == null || value.length() == 0 ) 061 return defaultValue; 062 063 return Integer.parseInt( value ); 064 } 065 066 public static int getMapCapacity( FlowProcess flowProcess, int defaultValue ) 067 { 068 String value = (String) flowProcess.getProperty( SpillableProps.MAP_CAPACITY ); 069 070 if( value == null || value.length() == 0 ) 071 return defaultValue; 072 073 return Integer.parseInt( value ); 074 } 075 076 public static float getMapLoadFactor( FlowProcess flowProcess, float defaultValue ) 077 { 078 String value = (String) flowProcess.getProperty( SpillableProps.MAP_LOADFACTOR ); 079 080 if( value == null || value.length() == 0 ) 081 return defaultValue; 082 083 return Float.parseFloat( value ); 084 } 085 086 public SpillableTupleMap( int mapThreshold, int initListThreshold ) 087 { 088 super( defaultMapInitialCapacity, defaultMapLoadFactor ); 089 this.mapThreshold = mapThreshold; 090 this.initListThreshold = initListThreshold; 091 } 092 093 public SpillableTupleMap( int initialCapacity, float loadFactor, int mapThreshold, int initListThreshold ) 094 { 095 super( initialCapacity, loadFactor ); 096 this.mapThreshold = mapThreshold; 097 this.initListThreshold = initListThreshold; 098 } 099 100 protected int getMapThreshold() 101 { 102 return mapThreshold; 103 } 104 105 public int getInitListThreshold() 106 { 107 return initListThreshold; 108 } 109 110 @Override 111 public Collection<Tuple> get( Object object ) 112 { 113 Collection<Tuple> value = super.get( object ); 114 115 if( value == null ) 116 { 117 value = createTupleCollection( (Tuple) object ); 118 119 super.put( (Tuple) object, value ); 120 } 121 122 return value; 123 } 124 125 protected abstract Collection<Tuple> createTupleCollection( Tuple object ); 126 127 @Override 128 public void setGrouping( Tuple group ) 129 { 130 } 131 132 @Override 133 public Tuple getGrouping() 134 { 135 return null; 136 } 137 138 @Override 139 public void setSpillStrategy( SpillStrategy spillStrategy ) 140 { 141 } 142 143 @Override 144 public int spillCount() 145 { 146 return 0; 147 } 148 149 public Spillable.SpillListener getSpillListener() 150 { 151 return spillListener; 152 } 153 154 public void setSpillListener( Spillable.SpillListener spillListener ) 155 { 156 this.spillListener = spillListener; 157 } 158 }