001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.tap.hadoop;
022    
023    import java.util.Map;
024    import java.util.Properties;
025    
026    import cascading.property.Props;
027    
028    /**
029     * Class HfsProps is a fluent helper for setting various Hadoop FS level properties that some
030     * {@link cascading.flow.Flow} may or may not be required to have set. These properties are typically passed to a Flow
031     * via a {@link cascading.flow.FlowConnector}.
032     */
033    public class HfsProps extends Props
034      {
035      /** Field TEMPORARY_DIRECTORY */
036      public static final String TEMPORARY_DIRECTORY = "cascading.tmp.dir";
037      /** Fields LOCAL_MODE_SCHEME * */
038      public static final String LOCAL_MODE_SCHEME = "cascading.hadoop.localmode.scheme";
039      /** Field COMBINE_INPUT_FILES */
040      public static final String COMBINE_INPUT_FILES = "cascading.hadoop.hfs.combine.files";
041      /** Field COMBINE_INPUT_FILES_SAFEMODE */
042      public static final String COMBINE_INPUT_FILES_SAFE_MODE = "cascading.hadoop.hfs.combine.safemode";
043      /** Field COMBINE_INPUT_FILES_SIZE_MAX */
044      public static final String COMBINE_INPUT_FILES_SIZE_MAX = "cascading.hadoop.hfs.combine.max.size";
045    
046      protected String temporaryDirectory;
047      protected String localModeScheme;
048      protected Boolean useCombinedInput;
049      protected Long combinedInputMaxSize;
050      protected Boolean combinedInputSafeMode;
051    
052      /**
053       * Method setTemporaryDirectory sets the temporary directory on the given properties object.
054       *
055       * @param properties         of type Map<Object,Object>
056       * @param temporaryDirectory of type String
057       */
058      public static void setTemporaryDirectory( Map<Object, Object> properties, String temporaryDirectory )
059        {
060        properties.put( TEMPORARY_DIRECTORY, temporaryDirectory );
061        }
062    
063      /**
064       * Method setLocalModeScheme provides a means to change the scheme value used to detect when a
065       * MapReduce job should be run in Hadoop local mode. By default the value is {@code "file"}, set to
066       * {@code "none"} to disable entirely.
067       *
068       * @param properties of type Map<Object,Object>
069       * @param scheme     a String
070       */
071      public static void setLocalModeScheme( Map<Object, Object> properties, String scheme )
072        {
073        properties.put( LOCAL_MODE_SCHEME, scheme );
074        }
075    
076      /**
077       * Method setUseCombinedInput provides a means to indicate whether to leverage
078       * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} for the input format. By default it is false.
079       * <p/>
080       * Use {@link #setCombinedInputMaxSize(long)} to set the max split/combined input size. Other specific
081       * properties must be specified directly if needed. Specifically "mapred.min.split.size.per.node" and
082       * "mapred.min.split.size.per.rack", which are 0 by default.
083       *
084       * @param properties of type Map<Object,Object>
085       * @param combine    a boolean
086       */
087      public static void setUseCombinedInput( Map<Object, Object> properties, Boolean combine )
088        {
089        if( combine != null )
090          properties.put( COMBINE_INPUT_FILES, Boolean.toString( combine ) );
091        }
092    
093      /**
094       * Method setUseCombinedInputSafeMode toggles safe mode when using
095       * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat}. Safe mode will throw an exception if the underlying
096       * InputFormat is not of type {@link org.apache.hadoop.mapred.FileInputFormat}. If safeMode is off a warning will
097       * be logged instead. safeMode is on by default.
098       * <p/>
099       * Setting this property when not setting {@link #setUseCombinedInput(boolean)} to true has no effect.
100       *
101       * @param properties of type Map<Object,Object>
102       * @param safeMode   a boolean
103       */
104      public static void setUseCombinedInputSafeMode( Map<Object, Object> properties, Boolean safeMode )
105        {
106        if( safeMode != null )
107          properties.put( COMBINE_INPUT_FILES_SAFE_MODE, Boolean.toString( safeMode ) );
108        }
109    
110    
111      /**
112       * Method setCombinedInputMaxSize sets the maximum input split size to be used.
113       * <p/>
114       * This property is an alias for the Hadoop property "mapred.max.split.size".
115       *
116       * @param properties of type Map<Object,Object>
117       * @param size       of type long
118       */
119      public static void setCombinedInputMaxSize( Map<Object, Object> properties, Long size )
120        {
121        if( size != null )
122          properties.put( COMBINE_INPUT_FILES_SIZE_MAX, Long.toString( size ) );
123        }
124    
125      public HfsProps()
126        {
127        }
128    
129      public String getTemporaryDirectory()
130        {
131        return temporaryDirectory;
132        }
133    
134      /**
135       * Method setTemporaryDirectory sets the temporary directory for use on the underlying filesystem.
136       *
137       * @param temporaryDirectory of type String
138       * @return returns this instance
139       */
140      public HfsProps setTemporaryDirectory( String temporaryDirectory )
141        {
142        this.temporaryDirectory = temporaryDirectory;
143    
144        return this;
145        }
146    
147      public String getLocalModeScheme()
148        {
149        return localModeScheme;
150        }
151    
152      /**
153       * Method setLocalModeScheme provides a means to change the scheme value used to detect when a
154       * MapReduce job should be run in Hadoop local mode. By default the value is {@code "file"}, set to
155       * {@code "none"} to disable entirely.
156       *
157       * @param localModeScheme of type String
158       * @return returns this instance
159       */
160      public HfsProps setLocalModeScheme( String localModeScheme )
161        {
162        this.localModeScheme = localModeScheme;
163    
164        return this;
165        }
166    
167      public boolean isUseCombinedInput()
168        {
169        return useCombinedInput;
170        }
171    
172      /**
173       * Method setUseCombinedInput provides a means to indicate whether to leverage
174       * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} for the input format. By default it is false.
175       *
176       * @param useCombinedInput boolean
177       * @return returns this instance
178       */
179      public HfsProps setUseCombinedInput( boolean useCombinedInput )
180        {
181        this.useCombinedInput = useCombinedInput;
182    
183        return this;
184        }
185    
186      public Long getCombinedInputMaxSize()
187        {
188        return combinedInputMaxSize;
189        }
190    
191      /**
192       * Method setCombinedInputMaxSize sets the maximum input split size to be used.
193       * <p/>
194       * This value is not honored unless {@link #setUseCombinedInput(boolean)} is {@code true}.
195       *
196       * @param combinedInputMaxSize of type long
197       * @return returns this instance
198       */
199      public HfsProps setCombinedInputMaxSize( long combinedInputMaxSize )
200        {
201        this.combinedInputMaxSize = combinedInputMaxSize;
202    
203        return this;
204        }
205    
206      public boolean isUseCombinedInputSafeMode()
207        {
208        return combinedInputSafeMode;
209        }
210    
211      /**
212       * Method setUseCombinedInputSafeMode toggles safe mode when using
213       * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat}. Safe mode will throw an exception if the underlying
214       * InputFormat is not of type {@link org.apache.hadoop.mapred.FileInputFormat}. If safeMode is off a warning will
215       * be logged instead. safeMode is on by default.
216       * <p/>
217       * Setting this property when not setting {@link #setUseCombinedInput(boolean)} to true has no effect.
218       *
219       * @param combinedInputSafeMode boolean
220       * @return returns this instance
221       */
222      public HfsProps setUseCombinedInputSafeMode( boolean combinedInputSafeMode )
223        {
224        this.combinedInputSafeMode = combinedInputSafeMode;
225    
226        return this;
227        }
228    
229    
230      @Override
231      protected void addPropertiesTo( Properties properties )
232        {
233        setTemporaryDirectory( properties, temporaryDirectory );
234        setLocalModeScheme( properties, localModeScheme );
235        setUseCombinedInput( properties, useCombinedInput );
236        setCombinedInputMaxSize( properties, combinedInputMaxSize );
237        setUseCombinedInputSafeMode( properties, combinedInputSafeMode );
238        }
239      }