001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tuple.hadoop;
022    
023    import java.util.ArrayList;
024    import java.util.LinkedHashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Properties;
028    
029    import cascading.property.Props;
030    import cascading.util.Util;
031    
032    /**
033     * Class TupleSerializationProps is a fluent interface for building properties to be passed to a
034     * {@link cascading.flow.FlowConnector} before creating new {@link cascading.flow.Flow} instances.
035     * <p/>
036     * See {@link TupleSerialization} for details on these properties.
037     *
038     * @see TupleSerialization
039     */
040    public class TupleSerializationProps extends Props
041      {
042      public static final String SERIALIZATION_TOKENS = "cascading.serialization.tokens";
043      public static final String HADOOP_IO_SERIALIZATIONS = "io.serializations";
044    
045      Map<Integer, String> serializationTokens = new LinkedHashMap<Integer, String>();
046      List<String> hadoopSerializations = new ArrayList<String>();
047    
048      /**
049       * Adds the given token and className pair as a serialization token property. During object serialization and deserialization,
050       * the given token will be used instead of the className when an instance of the className is encountered.
051       *
052       * @param properties of type Map
053       * @param token      of type int
054       * @param className  of type String
055       */
056      public static void addSerializationToken( Map<Object, Object> properties, int token, String className )
057        {
058        String tokens = getSerializationTokens( properties );
059    
060        properties.put( SERIALIZATION_TOKENS, Util.join( ",", Util.removeNulls( tokens, token + "=" + className ) ) );
061        }
062    
063      /**
064       * Returns the serialization tokens property.
065       *
066       * @param properties of type Map
067       * @return returns a String
068       */
069      public static String getSerializationTokens( Map<Object, Object> properties )
070        {
071        return (String) properties.get( SERIALIZATION_TOKENS );
072        }
073    
074      /**
075       * Adds the given className as a Hadoop IO serialization class.
076       *
077       * @param properties of type Map
078       * @param className  of type String
079       */
080      public static void addSerialization( Map<Object, Object> properties, String className )
081        {
082        String serializations = (String) properties.get( HADOOP_IO_SERIALIZATIONS );
083    
084        properties.put( HADOOP_IO_SERIALIZATIONS, Util.join( ",", Util.removeNulls( serializations, className ) ) );
085        }
086    
087      public static TupleSerializationProps tupleSerializationProps()
088        {
089        return new TupleSerializationProps();
090        }
091    
092      public TupleSerializationProps()
093        {
094        }
095    
096      public Map<Integer, String> getSerializationTokens()
097        {
098        return serializationTokens;
099        }
100    
101      public TupleSerializationProps setSerializationTokens( Map<Integer, String> serializationTokens )
102        {
103        this.serializationTokens = serializationTokens;
104    
105        return this;
106        }
107    
108      public TupleSerializationProps addSerializationTokens( Map<Integer, String> serializationTokens )
109        {
110        this.serializationTokens.putAll( serializationTokens );
111    
112        return this;
113        }
114    
115      public TupleSerializationProps addSerializationToken( int token, String serialization )
116        {
117        this.serializationTokens.put( token, serialization );
118    
119        return this;
120        }
121    
122      public List<String> getHadoopSerializations()
123        {
124        return hadoopSerializations;
125        }
126    
127      public TupleSerializationProps setHadoopSerializations( List<String> hadoopSerializations )
128        {
129        this.hadoopSerializations = hadoopSerializations;
130    
131        return this;
132        }
133    
134      public TupleSerializationProps addHadoopSerializations( List<String> hadoopSerializations )
135        {
136        this.hadoopSerializations.addAll( hadoopSerializations );
137    
138        return this;
139        }
140    
141      public TupleSerializationProps addHadoopSerialization( String hadoopSerialization )
142        {
143        this.hadoopSerializations.add( hadoopSerialization );
144    
145        return this;
146        }
147    
148      @Override
149      protected void addPropertiesTo( Properties properties )
150        {
151        for( Map.Entry<Integer, String> entry : serializationTokens.entrySet() )
152          addSerializationToken( properties, entry.getKey(), entry.getValue() );
153    
154        for( String hadoopSerialization : hadoopSerializations )
155          addSerialization( properties, hadoopSerialization );
156        }
157      }