001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.platform;
022
023import java.io.IOException;
024import java.util.Comparator;
025import java.util.HashMap;
026import java.util.Map;
027
028import cascading.flow.FlowConnector;
029import cascading.flow.FlowProcess;
030import cascading.property.AppProps;
031import cascading.scheme.Scheme;
032import cascading.scheme.util.FieldTypeResolver;
033import cascading.tap.SinkMode;
034import cascading.tap.Tap;
035import cascading.tap.partition.Partition;
036import cascading.tuple.Fields;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 *
042 */
043public abstract class TestPlatform
044  {
045  private static final Logger LOG = LoggerFactory.getLogger( TestPlatform.class );
046
047  public static final String CLUSTER_TESTING_PROPERTY = "test.cluster.enabled";
048  public static final String PLATFORM_PREFIX = "platform.";
049
050  private boolean useCluster = false;
051  private boolean enableCluster = true;
052  protected int numMappers = 0;
053  protected int numReducers = 0;
054  protected int numGatherPartitions = 0;
055
056  /**
057   * Method getGlobalProperties fetches all "platform." prefixed system properties.
058   * <p/>
059   * Sub-classes of TestPlatform should use these values as overrides before returning from
060   * {@link #getProperties()}.
061   *
062   * @return a Map of properties
063   */
064  public static Map<Object, Object> getGlobalProperties()
065    {
066    HashMap<Object, Object> properties = new HashMap<Object, Object>();
067
068    for( String propertyName : System.getProperties().stringPropertyNames() )
069      {
070      if( propertyName.startsWith( PLATFORM_PREFIX ) )
071        properties.put( propertyName.substring( PLATFORM_PREFIX.length() ), System.getProperty( propertyName ) );
072      }
073
074    if( !properties.isEmpty() )
075      LOG.info( "platform property overrides: ", properties );
076
077    return properties;
078    }
079
080  protected TestPlatform()
081    {
082    enableCluster = Boolean.parseBoolean( System.getProperty( CLUSTER_TESTING_PROPERTY, Boolean.toString( enableCluster ) ) );
083    }
084
085  public String getName()
086    {
087    return getClass().getSimpleName().replaceAll( "^(.*)Platform$", "$1" ).toLowerCase();
088    }
089
090  public boolean supportsGroupByAfterMerge()
091    {
092    return false;
093    }
094
095  public boolean isMapReduce()
096    {
097    return false;
098    }
099
100  public boolean isDAG()
101    {
102    return false;
103    }
104
105  public int getNumMappers()
106    {
107    return numMappers;
108    }
109
110  public void setNumMappers( int numMappers )
111    {
112    this.numMappers = numMappers;
113    }
114
115  public int getNumReducers()
116    {
117    return numReducers;
118    }
119
120  public void setNumReducers( int numReducers )
121    {
122    this.numReducers = numReducers;
123    }
124
125  public int getNumGatherPartitions()
126    {
127    return numGatherPartitions;
128    }
129
130  public void setNumGatherPartitions( int numGatherPartitions )
131    {
132    this.numGatherPartitions = numGatherPartitions;
133    }
134
135  public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
136    {
137    // do nothing
138    }
139
140  public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
141    {
142    // do nothing
143    }
144
145  public void setNumGatherPartitionTasks( Map<Object, Object> properties, int numReduceTasks )
146    {
147    // do nothing
148    }
149
150  public Integer getNumMapTasks( Map<Object, Object> properties )
151    {
152    return null;
153    }
154
155  public Integer getNumReduceTasks( Map<Object, Object> properties )
156    {
157    return null;
158    }
159
160  public Integer getNumGatherPartitionTasks( Map<Object, Object> properties )
161    {
162    return null;
163    }
164
165  public abstract void setUp() throws IOException;
166
167  public abstract Map<Object, Object> getProperties();
168
169  public abstract void tearDown();
170
171  public void setUseCluster( boolean useCluster )
172    {
173    this.useCluster = useCluster;
174    }
175
176  public boolean isUseCluster()
177    {
178    return enableCluster && useCluster;
179    }
180
181  public abstract void copyFromLocal( String inputFile ) throws IOException;
182
183  public abstract void copyToLocal( String outputFile ) throws IOException;
184
185  public abstract boolean remoteExists( String outputFile ) throws IOException;
186
187  public abstract boolean remoteRemove( String outputFile, boolean recursive ) throws IOException;
188
189  public abstract FlowProcess getFlowProcess();
190
191  public abstract FlowConnector getFlowConnector( Map<Object, Object> properties );
192
193  public FlowConnector getFlowConnector()
194    {
195    return getFlowConnector( getProperties() );
196    }
197
198  public abstract Tap getTap( Scheme scheme, String filename, SinkMode mode );
199
200  public Tap getTextFile( Fields sourceFields, String filename )
201    {
202    return getTextFile( sourceFields, filename, SinkMode.KEEP );
203    }
204
205  public Tap getTextFile( String filename )
206    {
207    return getTextFile( filename, SinkMode.KEEP );
208    }
209
210  public Tap getTextFile( String filename, SinkMode mode )
211    {
212    return getTextFile( null, filename, mode );
213    }
214
215  public Tap getTextFile( Fields sourceFields, String filename, SinkMode mode )
216    {
217    return getTextFile( sourceFields, Fields.ALL, filename, mode );
218    }
219
220  public abstract Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode );
221
222  public Tap getDelimitedFile( Fields fields, String delimiter, String filename )
223    {
224    return getDelimitedFile( fields, false, delimiter, "\"", null, filename, SinkMode.KEEP );
225    }
226
227  public Tap getDelimitedFile( Fields fields, String delimiter, String filename, SinkMode mode )
228    {
229    return getDelimitedFile( fields, false, delimiter, "\"", null, filename, mode );
230    }
231
232  public Tap getTabDelimitedFile( Fields fields, String filename, SinkMode mode )
233    {
234    return getDelimitedFile( fields, false, "\t", "\"", null, filename, mode );
235    }
236
237  public Tap getTabDelimitedFile( Fields fields, boolean hasHeader, String filename, SinkMode mode )
238    {
239    return getDelimitedFile( fields, hasHeader, "\t", "\"", null, filename, mode );
240    }
241
242  public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, String filename, SinkMode mode )
243    {
244    return getDelimitedFile( fields, hasHeader, delimiter, quote, null, filename, mode );
245    }
246
247  public Tap getDelimitedFile( Fields fields, String delimiter, String quote, String filename, SinkMode mode )
248    {
249    return getDelimitedFile( fields, false, delimiter, quote, null, filename, mode );
250    }
251
252  public Tap getDelimitedFile( Fields fields, String delimiter, Class[] types, String filename, SinkMode mode )
253    {
254    return getDelimitedFile( fields, false, delimiter, "\"", types, filename, mode );
255    }
256
257  public abstract Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode );
258
259  public abstract Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode );
260
261  public abstract Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode );
262
263  public abstract Tap getPartitionTap( Tap sink, Partition partition, int openThreshold );
264
265  public abstract Scheme getTestConfigDefScheme();
266
267  public abstract Scheme getTestFailScheme();
268
269  public abstract Comparator getLongComparator( boolean reverseSort );
270
271  public abstract Comparator getStringComparator( boolean reverseSort );
272
273  public abstract String getHiddenTemporaryPath();
274
275  protected String getApplicationJar()
276    {
277    // mapred.jar is for backwards compatibility with the compatibility suite
278    String property = System.getProperty( "mapred.jar", System.getProperty( AppProps.APP_JAR_PATH ) );
279
280    if( property == null || property.isEmpty() )
281      return null;
282
283    return property;
284    }
285  }