001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.tuple.hadoop; 022 023 import java.util.ArrayList; 024 import java.util.LinkedHashMap; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Properties; 028 029 import cascading.property.Props; 030 import cascading.util.Util; 031 032 /** 033 * Class TupleSerializationProps is a fluent interface for building properties to be passed to a 034 * {@link cascading.flow.FlowConnector} before creating new {@link cascading.flow.Flow} instances. 035 * <p/> 036 * See {@link TupleSerialization} for details on these properties. 037 * 038 * @see TupleSerialization 039 */ 040 public class TupleSerializationProps extends Props 041 { 042 public static final String SERIALIZATION_TOKENS = "cascading.serialization.tokens"; 043 public static final String HADOOP_IO_SERIALIZATIONS = "io.serializations"; 044 045 Map<Integer, String> serializationTokens = new LinkedHashMap<Integer, String>(); 046 List<String> hadoopSerializations = new ArrayList<String>(); 047 048 /** 049 * Adds the given token and className pair as a serialization token property. During object serialization and deserialization, 050 * the given token will be used instead of the className when an instance of the className is encountered. 051 * 052 * @param properties of type Map 053 * @param token of type int 054 * @param className of type String 055 */ 056 public static void addSerializationToken( Map<Object, Object> properties, int token, String className ) 057 { 058 String tokens = getSerializationTokens( properties ); 059 060 properties.put( SERIALIZATION_TOKENS, Util.join( ",", Util.removeNulls( tokens, token + "=" + className ) ) ); 061 } 062 063 /** 064 * Returns the serialization tokens property. 065 * 066 * @param properties of type Map 067 * @return returns a String 068 */ 069 public static String getSerializationTokens( Map<Object, Object> properties ) 070 { 071 return (String) properties.get( SERIALIZATION_TOKENS ); 072 } 073 074 /** 075 * Adds the given className as a Hadoop IO serialization class. 076 * 077 * @param properties of type Map 078 * @param className of type String 079 */ 080 public static void addSerialization( Map<Object, Object> properties, String className ) 081 { 082 String serializations = (String) properties.get( HADOOP_IO_SERIALIZATIONS ); 083 084 properties.put( HADOOP_IO_SERIALIZATIONS, Util.join( ",", Util.removeNulls( serializations, className ) ) ); 085 } 086 087 public static TupleSerializationProps tupleSerializationProps() 088 { 089 return new TupleSerializationProps(); 090 } 091 092 public TupleSerializationProps() 093 { 094 } 095 096 public Map<Integer, String> getSerializationTokens() 097 { 098 return serializationTokens; 099 } 100 101 public TupleSerializationProps setSerializationTokens( Map<Integer, String> serializationTokens ) 102 { 103 this.serializationTokens = serializationTokens; 104 105 return this; 106 } 107 108 public TupleSerializationProps addSerializationTokens( Map<Integer, String> serializationTokens ) 109 { 110 this.serializationTokens.putAll( serializationTokens ); 111 112 return this; 113 } 114 115 public TupleSerializationProps addSerializationToken( int token, String serialization ) 116 { 117 this.serializationTokens.put( token, serialization ); 118 119 return this; 120 } 121 122 public List<String> getHadoopSerializations() 123 { 124 return hadoopSerializations; 125 } 126 127 public TupleSerializationProps setHadoopSerializations( List<String> hadoopSerializations ) 128 { 129 this.hadoopSerializations = hadoopSerializations; 130 131 return this; 132 } 133 134 public TupleSerializationProps addHadoopSerializations( List<String> hadoopSerializations ) 135 { 136 this.hadoopSerializations.addAll( hadoopSerializations ); 137 138 return this; 139 } 140 141 public TupleSerializationProps addHadoopSerialization( String hadoopSerialization ) 142 { 143 this.hadoopSerializations.add( hadoopSerialization ); 144 145 return this; 146 } 147 148 @Override 149 protected void addPropertiesTo( Properties properties ) 150 { 151 for( Map.Entry<Integer, String> entry : serializationTokens.entrySet() ) 152 addSerializationToken( properties, entry.getKey(), entry.getValue() ); 153 154 for( String hadoopSerialization : hadoopSerializations ) 155 addSerialization( properties, hadoopSerialization ); 156 } 157 }