001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.tuple.hadoop;
023
024import java.util.ArrayList;
025import java.util.LinkedHashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Properties;
029
030import cascading.property.Props;
031import cascading.tuple.Tuple;
032import cascading.util.Util;
033
034/**
035 * Class TupleSerializationProps is a fluent interface for building properties to be passed to a
036 * {@link cascading.flow.FlowConnector} before creating new {@link cascading.flow.Flow} instances.
037 * <p>
038 * See {@link TupleSerialization} for details on these properties.
039 *
040 * @see TupleSerialization
041 */
042public class TupleSerializationProps extends Props
043  {
044  public static final String SERIALIZATION_TOKENS = "cascading.serialization.tokens";
045  public static final String SERIALIZATION_COMPARISON_BITWISE_PREVENT = "cascading.serialization.comparison.bitwise.prevent";
046  public static final String IGNORE_TYPES = "cascading.serialization.types.ignored";
047  public static final String REQUIRE_TYPES = "cascading.serialization.types.required";
048  public static final String HADOOP_IO_SERIALIZATIONS = "io.serializations";
049
050  Map<Integer, String> serializationTokens = new LinkedHashMap<Integer, String>();
051  List<String> hadoopSerializations = new ArrayList<String>();
052  Boolean ignoreTypes;
053  Boolean requireTypes;
054  Boolean preventBitWiseComparisons;
055
056  /**
057   * Adds the given token and className pair as a serialization token property. During object serialization and deserialization,
058   * the given token will be used instead of the className when an instance of the className is encountered.
059   *
060   * @param properties of type Map
061   * @param token      of type int
062   * @param className  of type String
063   */
064  public static void addSerializationToken( Map<Object, Object> properties, int token, String className )
065    {
066    String tokens = getSerializationTokens( properties );
067
068    properties.put( SERIALIZATION_TOKENS, Util.join( ",", Util.removeNulls( tokens, token + "=" + className ) ) );
069    }
070
071  /**
072   * Returns the serialization tokens property.
073   *
074   * @param properties of type Map
075   * @return returns a String
076   */
077  public static String getSerializationTokens( Map<Object, Object> properties )
078    {
079    return (String) properties.get( SERIALIZATION_TOKENS );
080    }
081
082  /**
083   * Adds the given className as a Hadoop IO serialization class.
084   *
085   * @param properties of type Map
086   * @param className  of type String
087   */
088  public static void addSerialization( Map<Object, Object> properties, String className )
089    {
090    String serializations = (String) properties.get( HADOOP_IO_SERIALIZATIONS );
091
092    properties.put( HADOOP_IO_SERIALIZATIONS, Util.join( ",", Util.removeNulls( serializations, className ) ) );
093    }
094
095  /**
096   * Creates a new TupleSerializationProps instance.
097   *
098   * @return TupleSerializationProps instance
099   */
100  public static TupleSerializationProps tupleSerializationProps()
101    {
102    return new TupleSerializationProps();
103    }
104
105  public TupleSerializationProps()
106    {
107    }
108
109  public Map<Integer, String> getSerializationTokens()
110    {
111    return serializationTokens;
112    }
113
114  /**
115   * Method setSerializationTokens sets the given integer tokens and classNames Map as a serialization properties.
116   * <p>
117   * During object serialization and deserialization, the given tokens will be used instead of the className when an
118   * instance of the className is encountered.
119   *
120   * @param serializationTokens Map of Integer tokens and String classnames
121   * @return this
122   */
123  public TupleSerializationProps setSerializationTokens( Map<Integer, String> serializationTokens )
124    {
125    this.serializationTokens = serializationTokens;
126
127    return this;
128    }
129
130  /**
131   * Method addSerializationTokens adds the given integer tokens and classNames Map as a serialization properties.
132   * <p>
133   * During object serialization and deserialization, the given tokens will be used instead of the className when an
134   * instance of the className is encountered.
135   *
136   * @param serializationTokens Map of Integer tokens and String classnames
137   * @return this
138   */
139  public TupleSerializationProps addSerializationTokens( Map<Integer, String> serializationTokens )
140    {
141    this.serializationTokens.putAll( serializationTokens );
142
143    return this;
144    }
145
146  /**
147   * Method addSerializationToken adds the given integer token and classNames as a serialization properties.
148   * <p>
149   * During object serialization and deserialization, the given tokens will be used instead of the className when an
150   * instance of the className is encountered.
151   *
152   * @param token                  type int
153   * @param serializationClassName type String
154   * @return this
155   */
156  public TupleSerializationProps addSerializationToken( int token, String serializationClassName )
157    {
158    this.serializationTokens.put( token, serializationClassName );
159
160    return this;
161    }
162
163  public List<String> getHadoopSerializations()
164    {
165    return hadoopSerializations;
166    }
167
168  /**
169   * Method setHadoopSerializations sets the Hadoop serialization classNames to be used as properties.
170   *
171   * @param hadoopSerializationClassNames List of classNames
172   * @return this
173   */
174  public TupleSerializationProps setHadoopSerializations( List<String> hadoopSerializationClassNames )
175    {
176    this.hadoopSerializations = hadoopSerializationClassNames;
177
178    return this;
179    }
180
181  /**
182   * Method addHadoopSerializations adds the Hadoop serialization classNames to be used as properties.
183   *
184   * @param hadoopSerializationClassNames List of classNames
185   * @return this
186   */
187  public TupleSerializationProps addHadoopSerializations( List<String> hadoopSerializationClassNames )
188    {
189    this.hadoopSerializations.addAll( hadoopSerializationClassNames );
190
191    return this;
192    }
193
194  /**
195   * Method addHadoopSerialization adds a Hadoop serialization className to be used as properties.
196   *
197   * @param hadoopSerializationClassName List of classNames
198   * @return this
199   */
200  public TupleSerializationProps addHadoopSerialization( String hadoopSerializationClassName )
201    {
202    this.hadoopSerializations.add( hadoopSerializationClassName );
203
204    return this;
205    }
206
207  public Boolean getIgnoreTypes()
208    {
209    return ignoreTypes;
210    }
211
212  /**
213   * Method setIgnoreTypes forces the {@link TupleSerialization} class to ignore any and all
214   * declared types causing the serialization to write each type or {@link SerializationToken}
215   * per {@link Tuple} element.
216   * <p>
217   * This disables the declared type optimizations.
218   * <p>
219   * See {@link #setRequireTypes(Boolean)} to force a failure if field type information is missing.
220   *
221   * @param ignoreTypes
222   * @return
223   */
224  public TupleSerializationProps setIgnoreTypes( Boolean ignoreTypes )
225    {
226    this.ignoreTypes = ignoreTypes;
227
228    return this;
229    }
230
231  public Boolean getRequireTypes()
232    {
233    return requireTypes;
234    }
235
236  /**
237   * Method setRequireTypes forces {@link TupleSerialization} to fail if field types are not declared.
238   * <p>
239   * This ensures the field type optimizations are leveraged.
240   * <p>
241   * See {@link #setIgnoreTypes(Boolean)} to force field type information to be discarded.
242   *
243   * @param requireTypes
244   * @return
245   */
246  public TupleSerializationProps setRequireTypes( Boolean requireTypes )
247    {
248    this.requireTypes = requireTypes;
249
250    return this;
251    }
252
253  /**
254   * Method preventBitWiseComparison will enable/disable bitwise comparisons of grouping keys
255   * during ordered partitioning ({@link cascading.pipe.GroupBy} and {@link cascading.pipe.CoGroup}).
256   * <p>
257   * If natural ordering of grouping/join keys is required, disable bit wise comparisons. They are enabled
258   * by default (subject to the below conditions).
259   * <p>
260   * Bit wise comparisons will only apply if the {@link cascading.tuple.Fields} used in the grouping/join are
261   * declared and no custom {@link java.util.Comparator} instances are provided on the grouping/key Fields, or
262   * no secondary sorting is being performed on a GroupBy.
263   *
264   * @param preventBitWiseComparisons set to true to disable bit wise comparisons
265   * @return this
266   */
267  public TupleSerializationProps preventBitWiseComparison( boolean preventBitWiseComparisons )
268    {
269    this.preventBitWiseComparisons = preventBitWiseComparisons;
270
271    return this;
272    }
273
274  public boolean getPreventBitWiseComparisons()
275    {
276    return preventBitWiseComparisons;
277    }
278
279  @Override
280  protected void addPropertiesTo( Properties properties )
281    {
282    for( Map.Entry<Integer, String> entry : serializationTokens.entrySet() )
283      addSerializationToken( properties, entry.getKey(), entry.getValue() );
284
285    for( String hadoopSerialization : hadoopSerializations )
286      addSerialization( properties, hadoopSerialization );
287
288    if( ignoreTypes != null )
289      properties.put( IGNORE_TYPES, ignoreTypes.toString() );
290
291    if( requireTypes != null )
292      properties.put( REQUIRE_TYPES, requireTypes.toString() );
293
294    if( preventBitWiseComparisons != null )
295      properties.put( SERIALIZATION_COMPARISON_BITWISE_PREVENT, preventBitWiseComparisons.toString() );
296    }
297  }