001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.platform; 023 024import java.io.IOException; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.Map; 028 029import cascading.flow.FlowConnector; 030import cascading.flow.FlowProcess; 031import cascading.property.AppProps; 032import cascading.scheme.Scheme; 033import cascading.scheme.util.FieldTypeResolver; 034import cascading.tap.SinkMode; 035import cascading.tap.Tap; 036import cascading.tap.partition.Partition; 037import cascading.tuple.Fields; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * 043 */ 044public abstract class TestPlatform 045 { 046 private static final Logger LOG = LoggerFactory.getLogger( TestPlatform.class ); 047 048 public static final String CLUSTER_TESTING_PROPERTY = "test.cluster.enabled"; 049 public static final String PLATFORM_PREFIX = "platform."; 050 051 private boolean useCluster = false; 052 private boolean enableCluster = true; 053 protected int numMappers = 0; 054 protected int numReducers = 0; 055 protected int numGatherPartitions = 0; 056 057 /** 058 * Method getGlobalProperties fetches all "platform." prefixed system properties. 059 * <p/> 060 * Sub-classes of TestPlatform should use these values as overrides before returning from 061 * {@link #getProperties()}. 062 * 063 * @return a Map of properties 064 */ 065 public static Map<Object, Object> getGlobalProperties() 066 { 067 HashMap<Object, Object> properties = new HashMap<Object, Object>(); 068 069 for( String propertyName : System.getProperties().stringPropertyNames() ) 070 { 071 if( propertyName.startsWith( PLATFORM_PREFIX ) ) 072 properties.put( propertyName.substring( PLATFORM_PREFIX.length() ), System.getProperty( propertyName ) ); 073 } 074 075 if( !properties.isEmpty() ) 076 LOG.info( "platform property overrides: ", properties ); 077 078 return properties; 079 } 080 081 protected TestPlatform() 082 { 083 enableCluster = Boolean.parseBoolean( System.getProperty( CLUSTER_TESTING_PROPERTY, Boolean.toString( enableCluster ) ) ); 084 } 085 086 public String getName() 087 { 088 return getClass().getSimpleName().replaceAll( "^(.*)Platform$", "$1" ).toLowerCase(); 089 } 090 091 /** 092 * Prior versions of all the planners had challenges with Merge->GroupBy and related plans. 093 */ 094 @Deprecated 095 public boolean supportsGroupByAfterMerge() 096 { 097 return true; 098 } 099 100 public boolean isMapReduce() 101 { 102 return false; 103 } 104 105 public boolean isDAG() 106 { 107 return false; 108 } 109 110 public int getNumMappers() 111 { 112 return numMappers; 113 } 114 115 public void setNumMappers( int numMappers ) 116 { 117 this.numMappers = numMappers; 118 } 119 120 public int getNumReducers() 121 { 122 return numReducers; 123 } 124 125 public void setNumReducers( int numReducers ) 126 { 127 this.numReducers = numReducers; 128 } 129 130 public int getNumGatherPartitions() 131 { 132 return numGatherPartitions; 133 } 134 135 public void setNumGatherPartitions( int numGatherPartitions ) 136 { 137 this.numGatherPartitions = numGatherPartitions; 138 } 139 140 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 141 { 142 // do nothing 143 } 144 145 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 146 { 147 // do nothing 148 } 149 150 public void setNumGatherPartitionTasks( Map<Object, Object> properties, int numReduceTasks ) 151 { 152 // do nothing 153 } 154 155 public Integer getNumMapTasks( Map<Object, Object> properties ) 156 { 157 return null; 158 } 159 160 public Integer getNumReduceTasks( Map<Object, Object> properties ) 161 { 162 return null; 163 } 164 165 public Integer getNumGatherPartitionTasks( Map<Object, Object> properties ) 166 { 167 return null; 168 } 169 170 public abstract void setUp() throws IOException; 171 172 public abstract Map<Object, Object> getProperties(); 173 174 public abstract void tearDown(); 175 176 public void setUseCluster( boolean useCluster ) 177 { 178 this.useCluster = useCluster; 179 } 180 181 public boolean isUseCluster() 182 { 183 return enableCluster && useCluster; 184 } 185 186 public abstract void copyFromLocal( String inputFile ) throws IOException; 187 188 public abstract void copyToLocal( String outputFile ) throws IOException; 189 190 public abstract boolean remoteExists( String outputFile ) throws IOException; 191 192 public abstract boolean remoteRemove( String outputFile, boolean recursive ) throws IOException; 193 194 public abstract FlowProcess getFlowProcess(); 195 196 public abstract FlowConnector getFlowConnector( Map<Object, Object> properties ); 197 198 public FlowConnector getFlowConnector() 199 { 200 return getFlowConnector( getProperties() ); 201 } 202 203 public abstract Tap getTap( Scheme scheme, String filename, SinkMode mode ); 204 205 public Tap getTextFile( Fields sourceFields, String filename ) 206 { 207 return getTextFile( sourceFields, filename, SinkMode.KEEP ); 208 } 209 210 public Tap getTextFile( String filename ) 211 { 212 return getTextFile( filename, SinkMode.KEEP ); 213 } 214 215 public Tap getTextFile( String filename, SinkMode mode ) 216 { 217 return getTextFile( null, filename, mode ); 218 } 219 220 public Tap getTextFile( Fields sourceFields, String filename, SinkMode mode ) 221 { 222 return getTextFile( sourceFields, Fields.ALL, filename, mode ); 223 } 224 225 public abstract Tap getTextFile( Fields sourceFields, Fields sinkFields, String filename, SinkMode mode ); 226 227 public Tap getDelimitedFile( Fields fields, String delimiter, String filename ) 228 { 229 return getDelimitedFile( fields, false, delimiter, "\"", null, filename, SinkMode.KEEP ); 230 } 231 232 public Tap getDelimitedFile( Fields fields, String delimiter, String filename, SinkMode mode ) 233 { 234 return getDelimitedFile( fields, false, delimiter, "\"", null, filename, mode ); 235 } 236 237 public Tap getTabDelimitedFile( Fields fields, String filename, SinkMode mode ) 238 { 239 return getDelimitedFile( fields, false, "\t", "\"", null, filename, mode ); 240 } 241 242 public Tap getTabDelimitedFile( Fields fields, boolean hasHeader, String filename, SinkMode mode ) 243 { 244 return getDelimitedFile( fields, hasHeader, "\t", "\"", null, filename, mode ); 245 } 246 247 public Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, String filename, SinkMode mode ) 248 { 249 return getDelimitedFile( fields, hasHeader, delimiter, quote, null, filename, mode ); 250 } 251 252 public Tap getDelimitedFile( Fields fields, String delimiter, String quote, String filename, SinkMode mode ) 253 { 254 return getDelimitedFile( fields, false, delimiter, quote, null, filename, mode ); 255 } 256 257 public Tap getDelimitedFile( Fields fields, String delimiter, Class[] types, String filename, SinkMode mode ) 258 { 259 return getDelimitedFile( fields, false, delimiter, "\"", types, filename, mode ); 260 } 261 262 public abstract Tap getDelimitedFile( Fields fields, boolean hasHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ); 263 264 public abstract Tap getDelimitedFile( Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, String quote, Class[] types, String filename, SinkMode mode ); 265 266 public abstract Tap getDelimitedFile( String delimiter, String quote, FieldTypeResolver fieldTypeResolver, String filename, SinkMode mode ); 267 268 public abstract Tap getPartitionTap( Tap sink, Partition partition, int openThreshold ); 269 270 public abstract Scheme getTestConfigDefScheme(); 271 272 public abstract Scheme getTestFailScheme(); 273 274 public abstract Comparator getLongComparator( boolean reverseSort ); 275 276 public abstract Comparator getStringComparator( boolean reverseSort ); 277 278 public abstract String getHiddenTemporaryPath(); 279 280 protected String getApplicationJar() 281 { 282 // mapred.jar is for backwards compatibility with the compatibility suite 283 String property = System.getProperty( "mapred.jar", System.getProperty( AppProps.APP_JAR_PATH ) ); 284 285 if( property == null || property.isEmpty() ) 286 return null; 287 288 return property; 289 } 290 }