001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop;
022    
023    import java.util.ArrayList;
024    import java.util.LinkedHashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Properties;
028    
029    import cascading.property.Props;
030    import cascading.util.Util;
031    
032    /**
033     * Class TupleSerializationProps is a fluent interface for building properties to be passed to a
034     * {@link cascading.flow.FlowConnector} before creating new {@link cascading.flow.Flow} instances.
035     * <p/>
036     * See {@link TupleSerialization} for details on these properties.
037     *
038     * @see TupleSerialization
039     */
040    public class TupleSerializationProps extends Props
041      {
042      public static final String SERIALIZATION_TOKENS = "cascading.serialization.tokens";
043      public static final String HADOOP_IO_SERIALIZATIONS = "io.serializations";
044    
045      Map<Integer, String> serializationTokens = new LinkedHashMap<Integer, String>();
046      List<String> hadoopSerializations = new ArrayList<String>();
047    
048      /**
049       * Adds the given token and className pair as a serialization token property. During object serialization and deserialization,
050       * the given token will be used instead of the className when an instance of the className is encountered.
051       *
052       * @param properties of type Map
053       * @param token      of type int
054       * @param className  of type String
055       */
056      public static void addSerializationToken( Map<Object, Object> properties, int token, String className )
057        {
058        String tokens = getSerializationTokens( properties );
059    
060        properties.put( SERIALIZATION_TOKENS, Util.join( ",", Util.removeNulls( tokens, token + "=" + className ) ) );
061        }
062    
063      /**
064       * Returns the serialization tokens property.
065       *
066       * @param properties of type Map
067       * @return returns a String
068       */
069      public static String getSerializationTokens( Map<Object, Object> properties )
070        {
071        return (String) properties.get( SERIALIZATION_TOKENS );
072        }
073    
074      /**
075       * Adds the given className as a Hadoop IO serialization class.
076       *
077       * @param properties of type Map
078       * @param className  of type String
079       */
080      public static void addSerialization( Map<Object, Object> properties, String className )
081        {
082        String serializations = (String) properties.get( HADOOP_IO_SERIALIZATIONS );
083    
084        properties.put( HADOOP_IO_SERIALIZATIONS, Util.join( ",", Util.removeNulls( serializations, className ) ) );
085        }
086    
087      /**
088       * Creates a new TupleSerializationProps instance.
089       *
090       * @return TupleSerializationProps instance
091       */
092      public static TupleSerializationProps tupleSerializationProps()
093        {
094        return new TupleSerializationProps();
095        }
096    
097      public TupleSerializationProps()
098        {
099        }
100    
101      public Map<Integer, String> getSerializationTokens()
102        {
103        return serializationTokens;
104        }
105    
106      /**
107       * Method setSerializationTokens sets the given integer tokens and classNames Map as a serialization properties.
108       * <p/>
109       * During object serialization and deserialization, the given tokens will be used instead of the className when an
110       * instance of the className is encountered.
111       *
112       * @param serializationTokens Map of Integer tokens and String classnames
113       * @return this
114       */
115      public TupleSerializationProps setSerializationTokens( Map<Integer, String> serializationTokens )
116        {
117        this.serializationTokens = serializationTokens;
118    
119        return this;
120        }
121    
122      /**
123       * Method addSerializationTokens adds the given integer tokens and classNames Map as a serialization properties.
124       * <p/>
125       * During object serialization and deserialization, the given tokens will be used instead of the className when an
126       * instance of the className is encountered.
127       *
128       * @param serializationTokens Map of Integer tokens and String classnames
129       * @return this
130       */
131      public TupleSerializationProps addSerializationTokens( Map<Integer, String> serializationTokens )
132        {
133        this.serializationTokens.putAll( serializationTokens );
134    
135        return this;
136        }
137    
138      /**
139       * Method addSerializationToken adds the given integer token and classNames as a serialization properties.
140       * <p/>
141       * During object serialization and deserialization, the given tokens will be used instead of the className when an
142       * instance of the className is encountered.
143       *
144       * @param token                  type int
145       * @param serializationClassName type String
146       * @return this
147       */
148      public TupleSerializationProps addSerializationToken( int token, String serializationClassName )
149        {
150        this.serializationTokens.put( token, serializationClassName );
151    
152        return this;
153        }
154    
155      public List<String> getHadoopSerializations()
156        {
157        return hadoopSerializations;
158        }
159    
160      /**
161       * Method setHadoopSerializations sets the Hadoop serialization classNames to be used as properties.
162       *
163       * @param hadoopSerializationClassNames List of classNames
164       * @return this
165       */
166      public TupleSerializationProps setHadoopSerializations( List<String> hadoopSerializationClassNames )
167        {
168        this.hadoopSerializations = hadoopSerializationClassNames;
169    
170        return this;
171        }
172    
173      /**
174       * Method addHadoopSerializations adds the Hadoop serialization classNames to be used as properties.
175       *
176       * @param hadoopSerializationClassNames List of classNames
177       * @return this
178       */
179      public TupleSerializationProps addHadoopSerializations( List<String> hadoopSerializationClassNames )
180        {
181        this.hadoopSerializations.addAll( hadoopSerializationClassNames );
182    
183        return this;
184        }
185    
186      /**
187       * Method addHadoopSerialization adds a Hadoop serialization className to be used as properties.
188       *
189       * @param hadoopSerializationClassName List of classNames
190       * @return this
191       */
192      public TupleSerializationProps addHadoopSerialization( String hadoopSerializationClassName )
193        {
194        this.hadoopSerializations.add( hadoopSerializationClassName );
195    
196        return this;
197        }
198    
199      @Override
200      protected void addPropertiesTo( Properties properties )
201        {
202        for( Map.Entry<Integer, String> entry : serializationTokens.entrySet() )
203          addSerializationToken( properties, entry.getKey(), entry.getValue() );
204    
205        for( String hadoopSerialization : hadoopSerializations )
206          addSerialization( properties, hadoopSerialization );
207        }
208      }