001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.platform;
023
024import java.io.IOException;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.Map;
028
029import cascading.flow.FlowConnector;
030import cascading.flow.FlowProcess;
031import cascading.property.AppProps;
032import cascading.scheme.Scheme;
033import cascading.scheme.util.FieldTypeResolver;
034import cascading.tap.SinkMode;
035import cascading.tap.Tap;
036import cascading.tap.partition.Partition;
037import cascading.tuple.Fields;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 *
043 */
044public abstract class TestPlatform
045  {
046  private static final Logger LOG = LoggerFactory.getLogger( TestPlatform.class );
047
048  public static final String CLUSTER_TESTING_PROPERTY = "test.cluster.enabled";
049  public static final String PLATFORM_PREFIX = "platform.";
050
051  private boolean useCluster = false;
052  private boolean enableCluster = true;
053  protected int numMappers = 0;
054  protected int numReducers = 0;
055  protected int numGatherPartitions = 0;
056
057  /**
058   * Method getGlobalProperties fetches all "platform." prefixed system properties.
059   * <p/>
060   * Sub-classes of TestPlatform should use these values as overrides before returning from
061   * {@link #getProperties()}.
062   *
063   * @return a Map of properties
064   */
065  public static Map<Object, Object> getGlobalProperties()
066    {
067    HashMap<Object, Object> properties = new HashMap<Object, Object>();
068
069    for( String propertyName : System.getProperties().stringPropertyNames() )
070      {
071      if( propertyName.startsWith( PLATFORM_PREFIX ) )
072        properties.put( propertyName.substring( PLATFORM_PREFIX.length() ), System.getProperty( propertyName ) );
073      }
074
075    if( !properties.isEmpty() )
076      LOG.info( "platform property overrides: ", properties );
077
078    return properties;
079    }
080
081  protected TestPlatform()
082    {
083    enableCluster = Boolean.parseBoolean( System.getProperty( CLUSTER_TESTING_PROPERTY, Boolean.toString( enableCluster ) ) );
084    }
085
086  public String getName()
087    {
088    return getClass().getSimpleName().replaceAll( "^(.*)Platform$", "$1" ).toLowerCase();
089    }
090
091  /**
092   * Prior versions of all the planners had challenges with Merge->GroupBy and related plans.
093   */
094  @Deprecated
095  public boolean supportsGroupByAfterMerge()
096    {
097    return true;
098    }
099
100  public boolean isMapReduce()
101    {
102    return false;
103    }
104
105  public boolean isDAG()
106    {
107    return false;
108    }
109
110  public int getNumMappers()
111    {
112    return numMappers;
113    }
114
115  public void setNumMappers( int numMappers )
116    {
117    this.numMappers = numMappers;
118    }
119
120  public int getNumReducers()
121    {
122    return numReducers;
123    }
124
125  public void setNumReducers( int numReducers )
126    {
127    this.numReducers = numReducers;
128    }
129
130  public int getNumGatherPartitions()
131    {
132    return numGatherPartitions;
133    }
134
135  public void setNumGatherPartitions( int numGatherPartitions )
136    {
137    this.numGatherPartitions = numGatherPartitions;
138    }
139
140  public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
141    {
142    // do nothing
143    }
144
145  public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
146    {
147    // do nothing
148    }
149
150  public void setNumGatherPartitionTasks( Map<Object, Object> properties, int numReduceTasks )
151    {
152    // do nothing
153    }
154
155  public Integer getNumMapTasks( Map<Object, Object> properties )
156    {
157    return null;
158    }
159
160  public Integer getNumReduceTasks( Map<Object, Object> properties )
161    {
162    return null;
163    }
164
165  public Integer getNumGatherPartitionTasks( Map<Object, Object> properties )
166    {
167    return null;
168    }
169
170  public abstract void setUp() throws IOException;
171
172  public abstract Map<Object, Object> getProperties();
173
174  public abstract void tearDown();
175
176  public void setUseCluster( boolean useCluster )
177    {
178    this.useCluster = useCluster;
179    }
180
181  public boolean isUseCluster()
182    {
183    return enableCluster && useCluster;
184    }
185
186  public abstract void copyFromLocal( String inputFile ) throws IOException;
187
188  public abstract void copyToLocal( String outputFile ) throws IOException;
189
190  public abstract boolean remoteExists( String outputFile ) throws IOException;
191
192  public abstract boolean remoteRemove( String outputFile, boolean recursive ) throws IOException;
193
194  public abstract FlowProcess getFlowProcess();
195
196  public abstract FlowConnector getFlowConnector( Map<Object, Object> properties );
197
198  public FlowConnector getFlowConnector()
199    {
200    return getFlowConnector( getProperties() );
201    }
202
203  public abstract Tap getTap( Scheme scheme, String filename, SinkMode mode );
204
205  public Tap getTextFile( Fields sourceFields, String filename )
206    {
207    return getTextFile( sourceFields, filename, SinkMode.KEEP );
208    }
209
210  public Tap getTextFile( String filename )
211    {
212    return getTextFile( filename, SinkMode.KEEP );
213    }
214
215  public Tap getTextFile( String filename, SinkMode mode )
216    {
217    return getTextFile( null, filename, mode );
218    }
219
220  public Tap getTextFile( Fields sourceFields, String filename, SinkMode mode )
221    {
222    return getTextFile( sourceFields, Fields.ALL, filename, mode );
223    }
224
225  public abstract Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode );
226
227  public Tap getDelimitedFile( Fields fields, String delimiter, String filename )
228    {
229    return getDelimitedFile( fields, false, delimiter, "\"", null, filename, SinkMode.KEEP );
230    }
231
232  public Tap getDelimitedFile( Fields fields, String delimiter, String filename, SinkMode mode )
233    {
234    return getDelimitedFile( fields, false, delimiter, "\"", null, filename, mode );
235    }
236
237  public Tap getTabDelimitedFile( Fields fields, String filename, SinkMode mode )
238    {
239    return getDelimitedFile( fields, false, "\t", "\"", null, filename, mode );
240    }
241
242  public Tap getTabDelimitedFile( Fields fields, boolean hasHeader, String filename, SinkMode mode )
243    {
244    return getDelimitedFile( fields, hasHeader, "\t", "\"", null, filename, mode );
245    }
246
247  public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, String filename, SinkMode mode )
248    {
249    return getDelimitedFile( fields, hasHeader, delimiter, quote, null, filename, mode );
250    }
251
252  public Tap getDelimitedFile( Fields fields, String delimiter, String quote, String filename, SinkMode mode )
253    {
254    return getDelimitedFile( fields, false, delimiter, quote, null, filename, mode );
255    }
256
257  public Tap getDelimitedFile( Fields fields, String delimiter, Class[] types, String filename, SinkMode mode )
258    {
259    return getDelimitedFile( fields, false, delimiter, "\"", types, filename, mode );
260    }
261
262  public abstract Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode );
263
264  public abstract Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode );
265
266  public abstract Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode );
267
268  public abstract Tap getPartitionTap( Tap sink, Partition partition, int openThreshold );
269
270  public abstract Scheme getTestConfigDefScheme();
271
272  public abstract Scheme getTestFailScheme();
273
274  public abstract Comparator getLongComparator( boolean reverseSort );
275
276  public abstract Comparator getStringComparator( boolean reverseSort );
277
278  public abstract String getHiddenTemporaryPath();
279
280  protected String getApplicationJar()
281    {
282    // mapred.jar is for backwards compatibility with the compatibility suite
283    String property = System.getProperty( "mapred.jar", System.getProperty( AppProps.APP_JAR_PATH ) );
284
285    if( property == null || property.isEmpty() )
286      return null;
287
288    return property;
289    }
290  }