001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple.collect;
023
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Properties;
027
028import cascading.property.Props;
029import cascading.util.Util;
030
031/**
032 * Class SpillableProps is a fluent interface for building properties to be passed to a
033 * {@link cascading.flow.FlowConnector} before creating new {@link cascading.flow.Flow} instances.
034 *
035 * @see SpillableTupleList
036 * @see SpillableTupleMap
037 */
038public class SpillableProps extends Props
039  {
040  /**
041   * Whether to enable compression of the spills or not, on by default.
042   *
043   * @see Boolean#parseBoolean(String)
044   */
045  public static final String SPILL_COMPRESS = "cascading.spill.compress";
046
047  /** A comma delimited list of possible codecs to try. This is platform dependent. */
048  public static final String SPILL_CODECS = "cascading.spill.codecs";
049
050  /** Number of tuples to hold in memory before spilling them to disk. */
051  public static final String LIST_THRESHOLD = "cascading.spill.list.threshold";
052
053  /** The total number of tuple values (not keys) to attempt to keep in memory. */
054  public static final String MAP_THRESHOLD = "cascading.spill.map.threshold";
055
056  /**
057   * The initial hash map capacity.
058   *
059   * @see java.util.HashMap
060   */
061  public static final String MAP_CAPACITY = "cascading.spill.map.capacity";
062
063  /**
064   * The initial hash map load factor.
065   *
066   * @see java.util.HashMap
067   */
068  public static final String MAP_LOADFACTOR = "cascading.spill.map.loadfactor";
069
070  public static final int defaultListThreshold = 10 * 1000;
071
072  public static final int defaultMapThreshold = 10 * 1000;
073  public static final int defaultMapInitialCapacity = 100 * 1000;
074  public static final float defaultMapLoadFactor = 0.75f;
075
076  boolean compressSpill = true;
077  List<String> codecs = new ArrayList<String>();
078
079  int listSpillThreshold = defaultListThreshold;
080
081  int mapSpillThreshold = defaultMapThreshold;
082  int mapInitialCapacity = defaultMapInitialCapacity;
083  float mapLoadFactor = defaultMapLoadFactor;
084
085  /**
086   * Creates a new SpillableProps instance.
087   *
088   * @return SpillableProps instance
089   */
090  public static SpillableProps spillableProps()
091    {
092    return new SpillableProps();
093    }
094
095  public SpillableProps()
096    {
097    }
098
099  public boolean isCompressSpill()
100    {
101    return compressSpill;
102    }
103
104  /**
105   * Method setCompressSpill either enables or disables spill compression. Enabled by default.
106   * <p>
107   * Spill compression relies on properly configured and available codecs. See {@link #setCodecs(java.util.List)}.
108   *
109   * @param compressSpill type boolean
110   * @return this
111   */
112  public SpillableProps setCompressSpill( boolean compressSpill )
113    {
114    this.compressSpill = compressSpill;
115
116    return this;
117    }
118
119  public List<String> getCodecs()
120    {
121    return codecs;
122    }
123
124  /**
125   * Method setCodecs sets list of possible codec class names to use. They will be loaded in order, if available.
126   * <p>
127   * This is platform dependent.
128   *
129   * @param codecs type list
130   * @return this
131   */
132  public SpillableProps setCodecs( List<String> codecs )
133    {
134    this.codecs = codecs;
135
136    return this;
137    }
138
139  /**
140   * Method addCodecs adds a list of possible codec class names to use. They will be loaded in order, if available.
141   * <p>
142   * This is platform dependent.
143   *
144   * @param codecs type list
145   */
146  public SpillableProps addCodecs( List<String> codecs )
147    {
148    this.codecs.addAll( codecs );
149
150    return this;
151    }
152
153  /**
154   * Method addCodec adds a codec class names to use.
155   * <p>
156   * This is platform dependent.
157   *
158   * @param codec type String
159   */
160  public SpillableProps addCodec( String codec )
161    {
162    this.codecs.add( codec );
163
164    return this;
165    }
166
167  public int getListSpillThreshold()
168    {
169    return listSpillThreshold;
170    }
171
172  /**
173   * Method setListSpillThreshold sets the number of tuples to hold in memory before spilling them to disk.
174   *
175   * @param listSpillThreshold of type int
176   * @return this
177   */
178  public SpillableProps setListSpillThreshold( int listSpillThreshold )
179    {
180    this.listSpillThreshold = listSpillThreshold;
181
182    return this;
183    }
184
185  public int getMapSpillThreshold()
186    {
187    return mapSpillThreshold;
188    }
189
190  /**
191   * Method setMapSpillThreshold the total number of tuple values (not keys) to attempt to keep in memory.
192   * <p>
193   * The default implementation cannot spill Map keys to disk.
194   *
195   * @param mapSpillThreshold of type int
196   * @return this
197   */
198  public SpillableProps setMapSpillThreshold( int mapSpillThreshold )
199    {
200    this.mapSpillThreshold = mapSpillThreshold;
201
202    return this;
203    }
204
205  public int getMapInitialCapacity()
206    {
207    return mapInitialCapacity;
208    }
209
210  /**
211   * Method setMapInitialCapacity sets the default capacity to be used by the backing Map implementation.
212   *
213   * @param mapInitialCapacity type int
214   * @return this
215   */
216  public SpillableProps setMapInitialCapacity( int mapInitialCapacity )
217    {
218    this.mapInitialCapacity = mapInitialCapacity;
219
220    return this;
221    }
222
223  public float getMapLoadFactor()
224    {
225    return mapLoadFactor;
226    }
227
228  /**
229   * Method setMapLoadFactor sets the default load factor to be used by the backing Map implementation.
230   *
231   * @param mapLoadFactor type float
232   * @return this
233   */
234  public SpillableProps setMapLoadFactor( float mapLoadFactor )
235    {
236    this.mapLoadFactor = mapLoadFactor;
237
238    return this;
239    }
240
241  @Override
242  protected void addPropertiesTo( Properties properties )
243    {
244    for( String codec : codecs )
245      {
246      String codecs = (String) properties.get( SPILL_CODECS );
247
248      properties.put( SPILL_CODECS, Util.join( ",", Util.removeNulls( codecs, codec ) ) );
249      }
250
251    properties.setProperty( SPILL_COMPRESS, Boolean.toString( compressSpill ) );
252    properties.setProperty( LIST_THRESHOLD, Integer.toString( listSpillThreshold ) );
253
254    properties.setProperty( MAP_THRESHOLD, Integer.toString( mapSpillThreshold ) );
255    properties.setProperty( MAP_CAPACITY, Integer.toString( mapInitialCapacity ) );
256    properties.setProperty( MAP_LOADFACTOR, Float.toString( mapLoadFactor ) );
257    }
258  }