001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.operation.hash; 022 023import java.nio.charset.Charset; 024import java.security.MessageDigest; 025import java.security.NoSuchAlgorithmException; 026import java.util.Iterator; 027import java.util.WeakHashMap; 028 029import cascading.CascadingException; 030import cascading.flow.FlowProcess; 031import cascading.operation.BaseOperation; 032import cascading.operation.Function; 033import cascading.operation.FunctionCall; 034import cascading.operation.OperationCall; 035import cascading.operation.SerFunction; 036import cascading.tuple.Fields; 037import cascading.tuple.Tuple; 038 039/** 040 * Class BaseHashFunction is the base class for Message Digest based hashing operations. 041 * <p> 042 * All arguments to this {@link Function} will be concatenated, hashed by the given {@code algorithm}, 043 * then encoded by the current encoding scheme. 044 * <p> 045 * If the value is null, an empty string is substituted. 046 */ 047public abstract class BaseHashFunction extends BaseOperation<BaseHashFunction.Context> 048 implements Function<BaseHashFunction.Context> 049 { 050 public static final String DEFAULT_ALGORITHM = "SHA-1"; 051 public static final String DEFAULT_CHARSET = "UTF-8"; 052 053 protected class Context 054 { 055 Tuple tuple = Tuple.size( 1 ); 056 WeakHashMap<String, String> cache = new WeakHashMap<>(); 057 MessageDigest digest = getDigest(); 058 } 059 060 protected final String algorithm; 061 protected final int maxLength; 062 protected final String charsetName; 063 protected final SerFunction<String, String> preDigest; 064 protected final SerFunction<StringBuilder, StringBuilder> postEncoding; 065 066 /** 067 * Constructor BaseHashFunction creates a new BaseHashFunction instance. 068 * 069 * @param fieldDeclaration of Fields 070 */ 071 public BaseHashFunction( Fields fieldDeclaration ) 072 { 073 this( fieldDeclaration, DEFAULT_ALGORITHM ); 074 } 075 076 /** 077 * Constructor BaseHashFunction creates a new BaseHashFunction instance. 078 * 079 * @param fieldDeclaration of Fields 080 * @param preDigest of SerFunction<String, String> 081 * @param postEncoding of SerFunction<StringBuilder, StringBuilder> 082 */ 083 public BaseHashFunction( Fields fieldDeclaration, SerFunction<String, String> preDigest, SerFunction<StringBuilder, StringBuilder> postEncoding ) 084 { 085 this( fieldDeclaration, DEFAULT_ALGORITHM, Integer.MAX_VALUE, DEFAULT_CHARSET, preDigest, postEncoding ); 086 } 087 088 /** 089 * Constructor BaseHashFunction creates a new BaseHashFunction instance. 090 * 091 * @param fieldDeclaration of Fields 092 * @param algorithm of String 093 */ 094 public BaseHashFunction( Fields fieldDeclaration, String algorithm ) 095 { 096 this( fieldDeclaration, algorithm, Integer.MAX_VALUE ); 097 } 098 099 /** 100 * Constructor BaseHashFunction creates a new BaseHashFunction instance. 101 * 102 * @param fieldDeclaration of Fields 103 * @param algorithm of String 104 * @param preDigest of SerFunction<String, String> 105 * @param postEncoding of SerFunction<StringBuilder, StringBuilder> 106 */ 107 public BaseHashFunction( Fields fieldDeclaration, String algorithm, SerFunction<String, String> preDigest, SerFunction<StringBuilder, StringBuilder> postEncoding ) 108 { 109 this( fieldDeclaration, algorithm, Integer.MAX_VALUE, DEFAULT_CHARSET, preDigest, postEncoding ); 110 } 111 112 /** 113 * Constructor BaseHashFunction creates a new BaseHashFunction instance. 114 * 115 * @param fieldDeclaration of Fields 116 * @param algorithm of String 117 * @param maxLength of int 118 */ 119 public BaseHashFunction( Fields fieldDeclaration, String algorithm, int maxLength ) 120 { 121 this( fieldDeclaration, algorithm, maxLength, DEFAULT_CHARSET ); 122 } 123 124 /** 125 * Constructor BaseHashFunction creates a new BaseHashFunction instance. 126 * 127 * @param fieldDeclaration of Fields 128 * @param algorithm of String 129 * @param maxLength of int 130 * @param preDigest of SerFunction<String, String> 131 * @param postEncoding of SerFunction<StringBuilder, StringBuilder> 132 */ 133 public BaseHashFunction( Fields fieldDeclaration, String algorithm, int maxLength, SerFunction<String, String> preDigest, SerFunction<StringBuilder, StringBuilder> postEncoding ) 134 { 135 this( fieldDeclaration, algorithm, maxLength, DEFAULT_CHARSET, preDigest, postEncoding ); 136 } 137 138 /** 139 * Constructor BaseHashFunction creates a new BaseHashFunction instance. 140 * 141 * @param fieldDeclaration of Fields 142 * @param algorithm of String 143 * @param maxLength of int 144 * @param charsetName of String 145 */ 146 public BaseHashFunction( Fields fieldDeclaration, String algorithm, int maxLength, String charsetName ) 147 { 148 this( fieldDeclaration, algorithm, maxLength, charsetName, null, null ); 149 } 150 151 /** 152 * Constructor BaseHashFunction creates a new BaseHashFunction instance. 153 * 154 * @param fieldDeclaration of Fields 155 * @param algorithm of String 156 * @param maxLength of int 157 * @param charsetName of String 158 * @param preDigest of SerFunction<String, String> 159 * @param postEncoding of SerFunction<StringBuilder, StringBuilder> 160 */ 161 public BaseHashFunction( Fields fieldDeclaration, String algorithm, int maxLength, String charsetName, 162 SerFunction<String, String> preDigest, 163 SerFunction<StringBuilder, StringBuilder> postEncoding ) 164 { 165 super( fieldDeclaration ); 166 this.algorithm = algorithm; 167 this.charsetName = charsetName; 168 this.maxLength = maxLength; 169 this.preDigest = preDigest == null ? SerFunction.identity() : preDigest; 170 this.postEncoding = postEncoding == null ? SerFunction.identity() : postEncoding; 171 172 if( fieldDeclaration.size() != 1 ) 173 throw new IllegalArgumentException( "fieldDeclaration may only declare one field, was " + fieldDeclaration.print() ); 174 175 verify(); 176 } 177 178 /** 179 * Method verify ... 180 */ 181 protected void verify() 182 { 183 getDigest(); 184 getCharset(); 185 } 186 187 /** 188 * Method getAlgorithm returns the algorithm of this BaseHashFunction object. 189 * 190 * @return the algorithm (type String) of this BaseHashFunction object. 191 */ 192 public String getAlgorithm() 193 { 194 return algorithm; 195 } 196 197 /** 198 * Method prepare ... 199 * 200 * @param flowProcess of FlowProcess 201 * @param operationCall of OperationCall<Context> 202 */ 203 @Override 204 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 205 { 206 operationCall.setContext( new Context() ); 207 } 208 209 /** 210 * Method operate ... 211 * 212 * @param flowProcess of FlowProcess 213 * @param functionCall of FunctionCall<Context> 214 */ 215 @Override 216 public void operate( FlowProcess flowProcess, FunctionCall<Context> functionCall ) 217 { 218 Context context = functionCall.getContext(); 219 Tuple result = context.tuple; 220 221 String string = getValue( functionCall ); 222 223 if( string == null ) 224 string = ""; 225 226 String encoded = 227 context.cache.computeIfAbsent( string, value -> 228 { 229 value = preDigest.apply( value ); 230 231 byte[] bytes = value.getBytes( getCharset() ); 232 byte[] digest = context.digest.digest( bytes ); // guaranteed single threading 233 234 StringBuilder buffer = new StringBuilder(); 235 236 performEncoding( buffer, digest ); 237 238 buffer = postEncoding.apply( buffer ); 239 240 if( buffer.length() > maxLength ) 241 return buffer.substring( 0, maxLength ); 242 243 return buffer.toString(); 244 } ); 245 246 result.set( 0, encoded ); 247 248 functionCall.getOutputCollector().add( result ); 249 } 250 251 /** 252 * Method performEncoding ... 253 * 254 * @param buffer of StringBuilder 255 * @param digest of byte[] 256 */ 257 protected abstract void performEncoding( StringBuilder buffer, byte[] digest ); 258 259 /** 260 * Method getValue ... 261 * 262 * @param functionCall of FunctionCall<Context> 263 * @return String 264 */ 265 protected String getValue( FunctionCall<Context> functionCall ) 266 { 267 // if one argument 268 if( functionCall.getArguments().size() == 1 ) 269 return functionCall.getArguments().getString( 0 ); 270 271 // if many arguments 272 Iterator<String> values = functionCall.getArguments().asIterableOf( String.class ).iterator(); 273 StringBuilder result = new StringBuilder(); 274 275 while( values.hasNext() ) 276 { 277 String next = values.next(); 278 279 if( next != null ) 280 result.append( next ); 281 } 282 283 return result.toString(); 284 } 285 286 /** 287 * Method getDigest returns the digest of this BaseHashFunction object. 288 * 289 * @return the digest (type MessageDigest) of this BaseHashFunction object. 290 */ 291 protected MessageDigest getDigest() 292 { 293 try 294 { 295 return MessageDigest.getInstance( getAlgorithm() ); 296 } 297 catch( NoSuchAlgorithmException exception ) 298 { 299 throw new CascadingException( "unknown digest algorithm: " + getAlgorithm(), exception ); 300 } 301 } 302 303 /** 304 * Method getCharset returns the charset of this BaseHashFunction object. 305 * 306 * @return the charset (type Charset) of this BaseHashFunction object. 307 */ 308 protected Charset getCharset() 309 { 310 return Charset.forName( charsetName ); 311 } 312 }