001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.hash;
022
023import java.nio.charset.Charset;
024import java.security.MessageDigest;
025import java.security.NoSuchAlgorithmException;
026import java.util.Iterator;
027import java.util.WeakHashMap;
028
029import cascading.CascadingException;
030import cascading.flow.FlowProcess;
031import cascading.operation.BaseOperation;
032import cascading.operation.Function;
033import cascading.operation.FunctionCall;
034import cascading.operation.OperationCall;
035import cascading.operation.SerFunction;
036import cascading.tuple.Fields;
037import cascading.tuple.Tuple;
038
039/**
040 * Class BaseHashFunction is the base class for Message Digest based hashing operations.
041 * <p>
042 * All arguments to this {@link Function} will be concatenated, hashed by the given {@code algorithm},
043 * then encoded by the current encoding scheme.
044 * <p>
045 * If the value is null, an empty string is substituted.
046 */
047public abstract class BaseHashFunction extends BaseOperation<BaseHashFunction.Context>
048  implements Function<BaseHashFunction.Context>
049  {
050  public static final String DEFAULT_ALGORITHM = "SHA-1";
051  public static final String DEFAULT_CHARSET = "UTF-8";
052
053  protected class Context
054    {
055    Tuple tuple = Tuple.size( 1 );
056    WeakHashMap<String, String> cache = new WeakHashMap<>();
057    MessageDigest digest = getDigest();
058    }
059
060  protected final String algorithm;
061  protected final int maxLength;
062  protected final String charsetName;
063  protected final SerFunction<String, String> preDigest;
064  protected final SerFunction<StringBuilder, StringBuilder> postEncoding;
065
066  /**
067   * Constructor BaseHashFunction creates a new BaseHashFunction instance.
068   *
069   * @param fieldDeclaration of Fields
070   */
071  public BaseHashFunction( Fields fieldDeclaration )
072    {
073    this( fieldDeclaration, DEFAULT_ALGORITHM );
074    }
075
076  /**
077   * Constructor BaseHashFunction creates a new BaseHashFunction instance.
078   *
079   * @param fieldDeclaration of Fields
080   * @param preDigest        of SerFunction<String, String>
081   * @param postEncoding     of SerFunction<StringBuilder, StringBuilder>
082   */
083  public BaseHashFunction( Fields fieldDeclaration, SerFunction<String, String> preDigest, SerFunction<StringBuilder, StringBuilder> postEncoding )
084    {
085    this( fieldDeclaration, DEFAULT_ALGORITHM, Integer.MAX_VALUE, DEFAULT_CHARSET, preDigest, postEncoding );
086    }
087
088  /**
089   * Constructor BaseHashFunction creates a new BaseHashFunction instance.
090   *
091   * @param fieldDeclaration of Fields
092   * @param algorithm        of String
093   */
094  public BaseHashFunction( Fields fieldDeclaration, String algorithm )
095    {
096    this( fieldDeclaration, algorithm, Integer.MAX_VALUE );
097    }
098
099  /**
100   * Constructor BaseHashFunction creates a new BaseHashFunction instance.
101   *
102   * @param fieldDeclaration of Fields
103   * @param algorithm        of String
104   * @param preDigest        of SerFunction<String, String>
105   * @param postEncoding     of SerFunction<StringBuilder, StringBuilder>
106   */
107  public BaseHashFunction( Fields fieldDeclaration, String algorithm, SerFunction<String, String> preDigest, SerFunction<StringBuilder, StringBuilder> postEncoding )
108    {
109    this( fieldDeclaration, algorithm, Integer.MAX_VALUE, DEFAULT_CHARSET, preDigest, postEncoding );
110    }
111
112  /**
113   * Constructor BaseHashFunction creates a new BaseHashFunction instance.
114   *
115   * @param fieldDeclaration of Fields
116   * @param algorithm        of String
117   * @param maxLength        of int
118   */
119  public BaseHashFunction( Fields fieldDeclaration, String algorithm, int maxLength )
120    {
121    this( fieldDeclaration, algorithm, maxLength, DEFAULT_CHARSET );
122    }
123
124  /**
125   * Constructor BaseHashFunction creates a new BaseHashFunction instance.
126   *
127   * @param fieldDeclaration of Fields
128   * @param algorithm        of String
129   * @param maxLength        of int
130   * @param preDigest        of SerFunction<String, String>
131   * @param postEncoding     of SerFunction<StringBuilder, StringBuilder>
132   */
133  public BaseHashFunction( Fields fieldDeclaration, String algorithm, int maxLength, SerFunction<String, String> preDigest, SerFunction<StringBuilder, StringBuilder> postEncoding )
134    {
135    this( fieldDeclaration, algorithm, maxLength, DEFAULT_CHARSET, preDigest, postEncoding );
136    }
137
138  /**
139   * Constructor BaseHashFunction creates a new BaseHashFunction instance.
140   *
141   * @param fieldDeclaration of Fields
142   * @param algorithm        of String
143   * @param maxLength        of int
144   * @param charsetName      of String
145   */
146  public BaseHashFunction( Fields fieldDeclaration, String algorithm, int maxLength, String charsetName )
147    {
148    this( fieldDeclaration, algorithm, maxLength, charsetName, null, null );
149    }
150
151  /**
152   * Constructor BaseHashFunction creates a new BaseHashFunction instance.
153   *
154   * @param fieldDeclaration of Fields
155   * @param algorithm        of String
156   * @param maxLength        of int
157   * @param charsetName      of String
158   * @param preDigest        of SerFunction<String, String>
159   * @param postEncoding     of SerFunction<StringBuilder, StringBuilder>
160   */
161  public BaseHashFunction( Fields fieldDeclaration, String algorithm, int maxLength, String charsetName,
162                           SerFunction<String, String> preDigest,
163                           SerFunction<StringBuilder, StringBuilder> postEncoding )
164    {
165    super( fieldDeclaration );
166    this.algorithm = algorithm;
167    this.charsetName = charsetName;
168    this.maxLength = maxLength;
169    this.preDigest = preDigest == null ? SerFunction.identity() : preDigest;
170    this.postEncoding = postEncoding == null ? SerFunction.identity() : postEncoding;
171
172    if( fieldDeclaration.size() != 1 )
173      throw new IllegalArgumentException( "fieldDeclaration may only declare one field, was " + fieldDeclaration.print() );
174
175    verify();
176    }
177
178  /**
179   * Method verify ...
180   */
181  protected void verify()
182    {
183    getDigest();
184    getCharset();
185    }
186
187  /**
188   * Method getAlgorithm returns the algorithm of this BaseHashFunction object.
189   *
190   * @return the algorithm (type String) of this BaseHashFunction object.
191   */
192  public String getAlgorithm()
193    {
194    return algorithm;
195    }
196
197  /**
198   * Method prepare ...
199   *
200   * @param flowProcess   of FlowProcess
201   * @param operationCall of OperationCall<Context>
202   */
203  @Override
204  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
205    {
206    operationCall.setContext( new Context() );
207    }
208
209  /**
210   * Method operate ...
211   *
212   * @param flowProcess  of FlowProcess
213   * @param functionCall of FunctionCall<Context>
214   */
215  @Override
216  public void operate( FlowProcess flowProcess, FunctionCall<Context> functionCall )
217    {
218    Context context = functionCall.getContext();
219    Tuple result = context.tuple;
220
221    String string = getValue( functionCall );
222
223    if( string == null )
224      string = "";
225
226    String encoded =
227      context.cache.computeIfAbsent( string, value ->
228      {
229      value = preDigest.apply( value );
230
231      byte[] bytes = value.getBytes( getCharset() );
232      byte[] digest = context.digest.digest( bytes ); // guaranteed single threading
233
234      StringBuilder buffer = new StringBuilder();
235
236      performEncoding( buffer, digest );
237
238      buffer = postEncoding.apply( buffer );
239
240      if( buffer.length() > maxLength )
241        return buffer.substring( 0, maxLength );
242
243      return buffer.toString();
244      } );
245
246    result.set( 0, encoded );
247
248    functionCall.getOutputCollector().add( result );
249    }
250
251  /**
252   * Method performEncoding ...
253   *
254   * @param buffer of StringBuilder
255   * @param digest of byte[]
256   */
257  protected abstract void performEncoding( StringBuilder buffer, byte[] digest );
258
259  /**
260   * Method getValue ...
261   *
262   * @param functionCall of FunctionCall<Context>
263   * @return String
264   */
265  protected String getValue( FunctionCall<Context> functionCall )
266    {
267    // if one argument
268    if( functionCall.getArguments().size() == 1 )
269      return functionCall.getArguments().getString( 0 );
270
271    // if many arguments
272    Iterator<String> values = functionCall.getArguments().asIterableOf( String.class ).iterator();
273    StringBuilder result = new StringBuilder();
274
275    while( values.hasNext() )
276      {
277      String next = values.next();
278
279      if( next != null )
280        result.append( next );
281      }
282
283    return result.toString();
284    }
285
286  /**
287   * Method getDigest returns the digest of this BaseHashFunction object.
288   *
289   * @return the digest (type MessageDigest) of this BaseHashFunction object.
290   */
291  protected MessageDigest getDigest()
292    {
293    try
294      {
295      return MessageDigest.getInstance( getAlgorithm() );
296      }
297    catch( NoSuchAlgorithmException exception )
298      {
299      throw new CascadingException( "unknown digest algorithm: " + getAlgorithm(), exception );
300      }
301    }
302
303  /**
304   * Method getCharset returns the charset of this BaseHashFunction object.
305   *
306   * @return the charset (type Charset) of this BaseHashFunction object.
307   */
308  protected Charset getCharset()
309    {
310    return Charset.forName( charsetName );
311    }
312  }