001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple.collect;
023
024import java.util.Collection;
025import java.util.HashMap;
026
027import cascading.flow.FlowProcess;
028import cascading.tuple.Tuple;
029
030import static cascading.tuple.collect.SpillableProps.defaultMapInitialCapacity;
031import static cascading.tuple.collect.SpillableProps.defaultMapLoadFactor;
032
033/**
034 * SpillableTupleMap is a HashMap that will allow for multiple values per key, and if the number of values for a given
035 * key reach a specific threshold, they will be spilled to disk using a {@link SpillableTupleList} instance. Only
036 * values are spilled, keys are not spilled and too many keys can result in a {@link OutOfMemoryError}.
037 * <p>
038 * The {@link cascading.tuple.collect.SpillableProps#MAP_THRESHOLD} value sets the number of total values that this map will
039 * strive to keep in memory regardless of the number of keys. This is achieved by dynamically calculating the threshold
040 * used by each child SpillableTupleList instance using
041 * {@code threshold = Min( list_threshold, map_threshold / current_num_keys ) }.
042 * <p>
043 * To set the list threshold, see {@link cascading.tuple.collect.SpillableProps} fluent helper class.
044 * <p>
045 * This class is used by the {@link cascading.pipe.HashJoin} pipe, to set properties specific to a given
046 * join instance, see the {@link cascading.pipe.HashJoin#getConfigDef()} method.
047 *
048 * @see cascading.tuple.hadoop.collect.HadoopSpillableTupleMap
049 */
050public abstract class SpillableTupleMap extends HashMap<Tuple, Collection<Tuple>> implements Spillable
051  {
052  private int mapThreshold;
053  private int initListThreshold;
054  private Spillable.SpillListener spillListener = Spillable.SpillListener.NULL;
055
056  public static int getMapThreshold( FlowProcess flowProcess, int defaultValue )
057    {
058    String value = (String) flowProcess.getProperty( SpillableProps.MAP_THRESHOLD );
059
060    if( value == null || value.length() == 0 )
061      return defaultValue;
062
063    return Integer.parseInt( value );
064    }
065
066  public static int getMapCapacity( FlowProcess flowProcess, int defaultValue )
067    {
068    String value = (String) flowProcess.getProperty( SpillableProps.MAP_CAPACITY );
069
070    if( value == null || value.length() == 0 )
071      return defaultValue;
072
073    return Integer.parseInt( value );
074    }
075
076  public static float getMapLoadFactor( FlowProcess flowProcess, float defaultValue )
077    {
078    String value = (String) flowProcess.getProperty( SpillableProps.MAP_LOADFACTOR );
079
080    if( value == null || value.length() == 0 )
081      return defaultValue;
082
083    return Float.parseFloat( value );
084    }
085
086  public SpillableTupleMap( int mapThreshold, int initListThreshold )
087    {
088    super( defaultMapInitialCapacity, defaultMapLoadFactor );
089    this.mapThreshold = mapThreshold;
090    this.initListThreshold = initListThreshold;
091    }
092
093  public SpillableTupleMap( int initialCapacity, float loadFactor, int mapThreshold, int initListThreshold )
094    {
095    super( initialCapacity, loadFactor );
096    this.mapThreshold = mapThreshold;
097    this.initListThreshold = initListThreshold;
098    }
099
100  protected int getMapThreshold()
101    {
102    return mapThreshold;
103    }
104
105  public int getInitListThreshold()
106    {
107    return initListThreshold;
108    }
109
110  @Override
111  public Collection<Tuple> get( Object object )
112    {
113    Collection<Tuple> value = super.get( object );
114
115    if( value == null )
116      {
117      value = createTupleCollection( (Tuple) object );
118
119      super.put( (Tuple) object, value );
120      }
121
122    return value;
123    }
124
125  protected abstract Collection<Tuple> createTupleCollection( Tuple object );
126
127  @Override
128  public void setGrouping( Tuple group )
129    {
130    }
131
132  @Override
133  public Tuple getGrouping()
134    {
135    return null;
136    }
137
138  @Override
139  public void setSpillStrategy( SpillStrategy spillStrategy )
140    {
141    }
142
143  @Override
144  public int spillCount()
145    {
146    return 0;
147    }
148
149  public Spillable.SpillListener getSpillListener()
150    {
151    return spillListener;
152    }
153
154  public void setSpillListener( Spillable.SpillListener spillListener )
155    {
156    this.spillListener = spillListener;
157    }
158  }