001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.platform; 022 023import java.io.IOException; 024import java.util.Comparator; 025import java.util.HashMap; 026import java.util.Map; 027 028import cascading.flow.FlowConnector; 029import cascading.flow.FlowProcess; 030import cascading.property.AppProps; 031import cascading.scheme.Scheme; 032import cascading.scheme.util.FieldTypeResolver; 033import cascading.tap.SinkMode; 034import cascading.tap.Tap; 035import cascading.tap.partition.Partition; 036import cascading.tuple.Fields; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * 042 */ 043public abstract class TestPlatform 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( TestPlatform.class ); 046 047 public static final String CLUSTER_TESTING_PROPERTY = "test.cluster.enabled"; 048 public static final String PLATFORM_PREFIX = "platform."; 049 050 private boolean useCluster = false; 051 private boolean enableCluster = true; 052 protected int numMappers = 0; 053 protected int numReducers = 0; 054 protected int numGatherPartitions = 0; 055 056 /** 057 * Method getGlobalProperties fetches all "platform." prefixed system properties. 058 * <p/> 059 * Sub-classes of TestPlatform should use these values as overrides before returning from 060 * {@link #getProperties()}. 061 * 062 * @return a Map of properties 063 */ 064 public static Map<Object, Object> getGlobalProperties() 065 { 066 HashMap<Object, Object> properties = new HashMap<Object, Object>(); 067 068 for( String propertyName : System.getProperties().stringPropertyNames() ) 069 { 070 if( propertyName.startsWith( PLATFORM_PREFIX ) ) 071 properties.put( propertyName.substring( PLATFORM_PREFIX.length() ), System.getProperty( propertyName ) ); 072 } 073 074 if( !properties.isEmpty() ) 075 LOG.info( "platform property overrides: ", properties ); 076 077 return properties; 078 } 079 080 protected TestPlatform() 081 { 082 enableCluster = Boolean.parseBoolean( System.getProperty( CLUSTER_TESTING_PROPERTY, Boolean.toString( enableCluster ) ) ); 083 } 084 085 public String getName() 086 { 087 return getClass().getSimpleName().replaceAll( "^(.*)Platform$", "$1" ).toLowerCase(); 088 } 089 090 public boolean supportsGroupByAfterMerge() 091 { 092 return false; 093 } 094 095 public boolean isMapReduce() 096 { 097 return false; 098 } 099 100 public boolean isDAG() 101 { 102 return false; 103 } 104 105 public int getNumMappers() 106 { 107 return numMappers; 108 } 109 110 public void setNumMappers( int numMappers ) 111 { 112 this.numMappers = numMappers; 113 } 114 115 public int getNumReducers() 116 { 117 return numReducers; 118 } 119 120 public void setNumReducers( int numReducers ) 121 { 122 this.numReducers = numReducers; 123 } 124 125 public int getNumGatherPartitions() 126 { 127 return numGatherPartitions; 128 } 129 130 public void setNumGatherPartitions( int numGatherPartitions ) 131 { 132 this.numGatherPartitions = numGatherPartitions; 133 } 134 135 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 136 { 137 // do nothing 138 } 139 140 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 141 { 142 // do nothing 143 } 144 145 public void setNumGatherPartitionTasks( Map<Object, Object> properties, int numReduceTasks ) 146 { 147 // do nothing 148 } 149 150 public Integer getNumMapTasks( Map<Object, Object> properties ) 151 { 152 return null; 153 } 154 155 public Integer getNumReduceTasks( Map<Object, Object> properties ) 156 { 157 return null; 158 } 159 160 public Integer getNumGatherPartitionTasks( Map<Object, Object> properties ) 161 { 162 return null; 163 } 164 165 public abstract void setUp() throws IOException; 166 167 public abstract Map<Object, Object> getProperties(); 168 169 public abstract void tearDown(); 170 171 public void setUseCluster( boolean useCluster ) 172 { 173 this.useCluster = useCluster; 174 } 175 176 public boolean isUseCluster() 177 { 178 return enableCluster && useCluster; 179 } 180 181 public abstract void copyFromLocal( String inputFile ) throws IOException; 182 183 public abstract void copyToLocal( String outputFile ) throws IOException; 184 185 public abstract boolean remoteExists( String outputFile ) throws IOException; 186 187 public abstract boolean remoteRemove( String outputFile, boolean recursive ) throws IOException; 188 189 public abstract FlowProcess getFlowProcess(); 190 191 public abstract FlowConnector getFlowConnector( Map<Object, Object> properties ); 192 193 public FlowConnector getFlowConnector() 194 { 195 return getFlowConnector( getProperties() ); 196 } 197 198 public abstract Tap getTap( Scheme scheme, String filename, SinkMode mode ); 199 200 public Tap getTextFile( Fields sourceFields, String filename ) 201 { 202 return getTextFile( sourceFields, filename, SinkMode.KEEP ); 203 } 204 205 public Tap getTextFile( String filename ) 206 { 207 return getTextFile( filename, SinkMode.KEEP ); 208 } 209 210 public Tap getTextFile( String filename, SinkMode mode ) 211 { 212 return getTextFile( null, filename, mode ); 213 } 214 215 public Tap getTextFile( Fields sourceFields, String filename, SinkMode mode ) 216 { 217 return getTextFile( sourceFields, Fields.ALL, filename, mode ); 218 } 219 220 public abstract Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode ); 221 222 public Tap getDelimitedFile( Fields fields, String delimiter, String filename ) 223 { 224 return getDelimitedFile( fields, false, delimiter, "\"", null, filename, SinkMode.KEEP ); 225 } 226 227 public Tap getDelimitedFile( Fields fields, String delimiter, String filename, SinkMode mode ) 228 { 229 return getDelimitedFile( fields, false, delimiter, "\"", null, filename, mode ); 230 } 231 232 public Tap getTabDelimitedFile( Fields fields, String filename, SinkMode mode ) 233 { 234 return getDelimitedFile( fields, false, "\t", "\"", null, filename, mode ); 235 } 236 237 public Tap getTabDelimitedFile( Fields fields, boolean hasHeader, String filename, SinkMode mode ) 238 { 239 return getDelimitedFile( fields, hasHeader, "\t", "\"", null, filename, mode ); 240 } 241 242 public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, String filename, SinkMode mode ) 243 { 244 return getDelimitedFile( fields, hasHeader, delimiter, quote, null, filename, mode ); 245 } 246 247 public Tap getDelimitedFile( Fields fields, String delimiter, String quote, String filename, SinkMode mode ) 248 { 249 return getDelimitedFile( fields, false, delimiter, quote, null, filename, mode ); 250 } 251 252 public Tap getDelimitedFile( Fields fields, String delimiter, Class[] types, String filename, SinkMode mode ) 253 { 254 return getDelimitedFile( fields, false, delimiter, "\"", types, filename, mode ); 255 } 256 257 public abstract Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ); 258 259 public abstract Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ); 260 261 public abstract Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode ); 262 263 public abstract Tap getPartitionTap( Tap sink, Partition partition, int openThreshold ); 264 265 public abstract Scheme getTestConfigDefScheme(); 266 267 public abstract Scheme getTestFailScheme(); 268 269 public abstract Comparator getLongComparator( boolean reverseSort ); 270 271 public abstract Comparator getStringComparator( boolean reverseSort ); 272 273 public abstract String getHiddenTemporaryPath(); 274 275 protected String getApplicationJar() 276 { 277 // mapred.jar is for backwards compatibility with the compatibility suite 278 String property = System.getProperty( "mapred.jar", System.getProperty( AppProps.APP_JAR_PATH ) ); 279 280 if( property == null || property.isEmpty() ) 281 return null; 282 283 return property; 284 } 285 }